http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/awsbatch_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/awsbatch_operator.py b/airflow/contrib/operators/awsbatch_operator.py index 920fdb7..6e8e2a4 100644 --- a/airflow/contrib/operators/awsbatch_operator.py +++ b/airflow/contrib/operators/awsbatch_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,9 +46,11 @@ class AWSBatchOperator(BaseOperator): :param max_retries: exponential backoff retries while waiter is not merged :type max_retries: int :param aws_conn_id: connection id of AWS credentials / region name. If None, - credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html). + credential boto3 strategy will be used + (http://boto3.readthedocs.io/en/latest/guide/configuration.html). :type aws_conn_id: str - :param region_name: region name to use in AWS Hook. Override the region_name in connection (if provided) + :param region_name: region name to use in AWS Hook. + Override the region_name in connection (if provided) """ ui_color = '#c3dae0' @@ -152,9 +154,12 @@ class AWSBatchOperator(BaseOperator): if (job['status'] == 'FAILED' or container['container']['exitCode'] != 0): print("@@@@") - raise AirflowException('This containers encounter an error during execution {}'.format(job)) + raise AirflowException( + 'This containers encounter an error during ' + 'execution {}'.format(job)) elif job['status'] is not 'SUCCEEDED': - raise AirflowException('This task is still pending {}'.format(job['status'])) + raise AirflowException( + 'This task is still pending {}'.format(job['status'])) def get_hook(self): return AwsHook(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_check_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py index 799d6d5..59ef5d3 100644 --- a/airflow/contrib/operators/bigquery_check_operator.py +++ b/airflow/contrib/operators/bigquery_check_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,7 +18,8 @@ # under the License. from airflow.contrib.hooks.bigquery_hook import BigQueryHook -from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator +from airflow.operators.check_operator import \ + CheckOperator, ValueCheckOperator, IntervalCheckOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_get_data.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py index aabfddd..ab8f71b 100644 --- a/airflow/contrib/operators/bigquery_get_data.py +++ b/airflow/contrib/operators/bigquery_get_data.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 04ddb6a..dc5edb3 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_table_delete_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py index 8751dfe..a16107d 100644 --- a/airflow/contrib/operators/bigquery_table_delete_operator.py +++ b/airflow/contrib/operators/bigquery_table_delete_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_to_bigquery.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py index b03511a..93a52b3 100644 --- a/airflow/contrib/operators/bigquery_to_bigquery.py +++ b/airflow/contrib/operators/bigquery_to_bigquery.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/bigquery_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py index 166d05e..e2ce930 100644 --- a/airflow/contrib/operators/bigquery_to_gcs.py +++ b/airflow/contrib/operators/bigquery_to_gcs.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/databricks_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py index 8d22d4c..7b8d522 100644 --- a/airflow/contrib/operators/databricks_operator.py +++ b/airflow/contrib/operators/databricks_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -205,13 +205,14 @@ class DatabricksSubmitRunOperator(BaseOperator): Coerces content or all values of content if it is a dict to a string. The function will throw if content contains non-string or non-numeric types. - The reason why we have this function is because the ``self.json`` field must be a dict - with only string values. This is because ``render_template`` will fail for numerical values. + The reason why we have this function is because the ``self.json`` field must be a + dict with only string values. This is because ``render_template`` will fail + for numerical values. """ c = self._deep_string_coerce if isinstance(content, six.string_types): return content - elif isinstance(content, six.integer_types+(float,)): + elif isinstance(content, six.integer_types + (float,)): # Databricks can tolerate either numeric or string types in the API backend. return str(content) elif isinstance(content, (list, tuple)): @@ -221,8 +222,8 @@ class DatabricksSubmitRunOperator(BaseOperator): for k, v in list(content.items())} else: param_type = type(content) - msg = 'Type {0} used for parameter {1} is not a number or a string' \ - .format(param_type, json_path) + msg = 'Type {0} used for parameter {1} is not a number or a string'\ + .format(param_type, json_path) raise AirflowException(msg) def _log_run_page_url(self, url): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py index 11adc60..e3c8c1f 100644 --- a/airflow/contrib/operators/dataflow_operator.py +++ b/airflow/contrib/operators/dataflow_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 22db3e6..4973eb1 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -249,13 +249,12 @@ class DataprocClusterCreateOperator(BaseOperator): self.project_id, self.zone ) master_type_uri = \ - "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format( - self.project_id, self.zone, self.master_machine_type - ) + "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}"\ + .format(self.project_id, self.zone, self.master_machine_type) worker_type_uri = \ - "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format( - self.project_id, self.zone, self.worker_machine_type - ) + "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}"\ + .format(self.project_id, self.zone, self.worker_machine_type) + cluster_data = { 'projectId': self.project_id, 'clusterName': self.cluster_name, @@ -297,7 +296,7 @@ class DataprocClusterCreateOperator(BaseOperator): # [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows # semantic versioning spec: x.y.z). cluster_data['labels'].update({'airflow-version': - 'v' + version.replace('.', '-').replace('+','-')}) + 'v' + version.replace('.', '-').replace('+', '-')}) if self.storage_bucket: cluster_data['config']['configBucket'] = self.storage_bucket if self.metadata: @@ -305,7 +304,8 @@ class DataprocClusterCreateOperator(BaseOperator): if self.network_uri: cluster_data['config']['gceClusterConfig']['networkUri'] = self.network_uri if self.subnetwork_uri: - cluster_data['config']['gceClusterConfig']['subnetworkUri'] = self.subnetwork_uri + cluster_data['config']['gceClusterConfig']['subnetworkUri'] = \ + self.subnetwork_uri if self.tags: cluster_data['config']['gceClusterConfig']['tags'] = self.tags if self.image_version: @@ -332,10 +332,10 @@ class DataprocClusterCreateOperator(BaseOperator): cluster_data['config']['initializationActions'] = init_actions_dict if self.service_account: cluster_data['config']['gceClusterConfig']['serviceAccount'] =\ - self.service_account + self.service_account if self.service_account_scopes: cluster_data['config']['gceClusterConfig']['serviceAccountScopes'] =\ - self.service_account_scopes + self.service_account_scopes return cluster_data def execute(self, context): @@ -368,7 +368,7 @@ class DataprocClusterCreateOperator(BaseOperator): self.log.info( 'Cluster {} already exists... Checking status...', self.cluster_name - ) + ) self._wait_for_done(service) return True else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/datastore_export_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py index 09b7965..f6dc7cc 100644 --- a/airflow/contrib/operators/datastore_export_operator.py +++ b/airflow/contrib/operators/datastore_export_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,15 +35,16 @@ class DatastoreExportOperator(BaseOperator): :type namespace: str :param datastore_conn_id: the name of the Datastore connection id to use :type datastore_conn_id: string - :param cloud_storage_conn_id: the name of the cloud storage connection id to force-write - backup + :param cloud_storage_conn_id: the name of the cloud storage connection id to + force-write backup :type cloud_storage_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string - :param entity_filter: description of what data from the project is included in the export, - refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter + :param entity_filter: description of what data from the project is included in the + export, refer to + https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter :type entity_filter: dict :param labels: client-assigned labels for cloud storage :type labels: dict @@ -92,7 +93,7 @@ class DatastoreExportOperator(BaseOperator): for o in objects: gcs_hook.delete(self.bucket, o) - ds_hook = DatastoreHook(self.datastore_conn_id,self.delegate_to) + ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to) result = ds_hook.export_to_storage_bucket(bucket=self.bucket, namespace=self.namespace, entity_filter=self.entity_filter, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/datastore_import_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py index 279b1a5..401d36e 100644 --- a/airflow/contrib/operators/datastore_import_operator.py +++ b/airflow/contrib/operators/datastore_import_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -35,8 +35,9 @@ class DatastoreImportOperator(BaseOperator): :param namespace: optional namespace of the backup metadata file in the specified Cloud Storage bucket. :type namespace: str - :param entity_filter: description of what data from the project is included in the export, - refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter + :param entity_filter: description of what data from the project is included in + the export, refer to + https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter :type entity_filter: dict :param labels: client-assigned labels for cloud storage :type labels: dict http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/discord_webhook_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/discord_webhook_operator.py b/airflow/contrib/operators/discord_webhook_operator.py index acfba58..6b44eac 100644 --- a/airflow/contrib/operators/discord_webhook_operator.py +++ b/airflow/contrib/operators/discord_webhook_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/druid_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py index c0cb09d..426393d 100644 --- a/airflow/contrib/operators/druid_operator.py +++ b/airflow/contrib/operators/druid_operator.py @@ -29,18 +29,17 @@ class DruidOperator(BaseOperator): :param json_index_file: The filepath to the druid index specification :type json_index_file: str - :param druid_ingest_conn_id: The connection id of the Druid overlord which accepts index jobs + :param druid_ingest_conn_id: The connection id of the Druid overlord which + accepts index jobs :type druid_ingest_conn_id: str """ template_fields = ('index_spec_str',) template_ext = ('.json',) - def __init__( - self, - json_index_file, - druid_ingest_conn_id='druid_ingest_default', - max_ingestion_time=None, - *args, **kwargs): + def __init__(self, json_index_file, + druid_ingest_conn_id='druid_ingest_default', + max_ingestion_time=None, + *args, **kwargs): super(DruidOperator, self).__init__(*args, **kwargs) self.conn_id = druid_ingest_conn_id self.max_ingestion_time = max_ingestion_time http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/ecs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py index 1016fca..60540f5 100644 --- a/airflow/contrib/operators/ecs_operator.py +++ b/airflow/contrib/operators/ecs_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -37,9 +37,11 @@ class ECSOperator(BaseOperator): http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task :type: overrides: dict :param aws_conn_id: connection id of AWS credentials / region name. If None, - credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html). + credential boto3 strategy will be used + (http://boto3.readthedocs.io/en/latest/guide/configuration.html). :type aws_conn_id: str - :param region_name: region name to use in AWS Hook. Override the region_name in connection (if provided) + :param region_name: region name to use in AWS Hook. + Override the region_name in connection (if provided) :param launch_type: the launch type on which to run your task ('EC2' or 'FARGATE') :type: launch_type: str """ @@ -66,7 +68,7 @@ class ECSOperator(BaseOperator): def execute(self, context): self.log.info( 'Running ECS Task - Task definition: %s - on cluster %s', - self.task_definition,self.cluster + self.task_definition, self.cluster ) self.log.info('ECSOperator overrides: %s', self.overrides) @@ -115,13 +117,16 @@ class ECSOperator(BaseOperator): for task in response['tasks']: containers = task['containers'] for container in containers: - if container.get('lastStatus') == 'STOPPED' and container['exitCode'] != 0: - raise AirflowException('This task is not in success state {}'.format(task)) + if container.get('lastStatus') == 'STOPPED' and \ + container['exitCode'] != 0: + raise AirflowException( + 'This task is not in success state {}'.format(task)) elif container.get('lastStatus') == 'PENDING': raise AirflowException('This task is still pending {}'.format(task)) elif 'error' in container.get('reason', '').lower(): - raise AirflowException('This containers encounter an error during launching : {}'. - format(container.get('reason', '').lower())) + raise AirflowException( + 'This containers encounter an error during launching : {}'. + format(container.get('reason', '').lower())) def get_hook(self): return AwsHook( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_add_steps_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py index 3d2d510..643ffe9 100644 --- a/airflow/contrib/operators/emr_add_steps_operator.py +++ b/airflow/contrib/operators/emr_add_steps_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_create_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py index e90c77a..89be12f 100644 --- a/airflow/contrib/operators/emr_create_job_flow_operator.py +++ b/airflow/contrib/operators/emr_create_job_flow_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,7 +25,8 @@ from airflow.exceptions import AirflowException class EmrCreateJobFlowOperator(BaseOperator): """ Creates an EMR JobFlow, reading the config from the EMR connection. - A dictionary of JobFlow overrides can be passed that override the config from the connection. + A dictionary of JobFlow overrides can be passed that override + the config from the connection. :param aws_conn_id: aws connection to uses :type aws_conn_id: str http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/emr_terminate_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py index 95c3164..50407a1 100644 --- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py +++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/file_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/file_to_gcs.py b/airflow/contrib/operators/file_to_gcs.py index b65e3b3..807385b 100644 --- a/airflow/contrib/operators/file_to_gcs.py +++ b/airflow/contrib/operators/file_to_gcs.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -65,8 +65,8 @@ class FileToGoogleCloudStorageOperator(BaseOperator): Uploads the file to Google cloud storage """ hook = GoogleCloudStorageHook( - google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, - delegate_to=self.delegate_to) + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) hook.upload( bucket=self.bucket, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/file_to_wasb.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py index f55c0c1..797960c 100644 --- a/airflow/contrib/operators/file_to_wasb.py +++ b/airflow/contrib/operators/file_to_wasb.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -57,6 +57,8 @@ class FileToWasbOperator(BaseOperator): """Upload a file to Azure Blob Storage.""" hook = WasbHook(wasb_conn_id=self.wasb_conn_id) self.log.info( - 'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals()) + 'Uploading {self.file_path} to wasb://{self.container_name} ' + 'as {self.blob_name}'.format(**locals()) ) - hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options) + hook.load_file(self.file_path, self.container_name, + self.blob_name, **self.load_options) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_download_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py index 7dfa96c..ce272ae 100644 --- a/airflow/contrib/operators/gcs_download_operator.py +++ b/airflow/contrib/operators/gcs_download_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -46,7 +46,8 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): connecting to Google cloud storage. :type google_cloud_storage_conn_id: string :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide delegation enabled. + For this to work, the service account making the request must have + domain-wide delegation enabled. :type delegate_to: string """ template_fields = ('bucket', 'object', 'filename', 'store_to_xcom_key',) @@ -84,5 +85,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): if sys.getsizeof(file_bytes) < 48000: context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes) else: - raise RuntimeError('The size of the downloaded file is too large to push to XCom!') + raise RuntimeError( + 'The size of the downloaded file is too large to push to XCom!' + ) self.log.debug(file_bytes) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_list_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py index 393deb9..6474453 100644 --- a/airflow/contrib/operators/gcs_list_operator.py +++ b/airflow/contrib/operators/gcs_list_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_operator.py b/airflow/contrib/operators/gcs_operator.py index 3d339d8..ef5e8de 100644 --- a/airflow/contrib/operators/gcs_operator.py +++ b/airflow/contrib/operators/gcs_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 938da4e..3a77980 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -184,8 +184,9 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) - if not self.schema_fields and self.schema_object \ - and self.source_format != 'DATASTORE_BACKUP': + if not self.schema_fields and \ + self.schema_object and \ + self.source_format != 'DATASTORE_BACKUP': gcs_hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/gcs_to_s3.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_s3.py b/airflow/contrib/operators/gcs_to_s3.py index 00c5e14..a87aa3a 100644 --- a/airflow/contrib/operators/gcs_to_s3.py +++ b/airflow/contrib/operators/gcs_to_s3.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/hipchat_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py index 5dd06f6..381cd72 100644 --- a/airflow/contrib/operators/hipchat_operator.py +++ b/airflow/contrib/operators/hipchat_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/hive_to_dynamodb.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/hive_to_dynamodb.py b/airflow/contrib/operators/hive_to_dynamodb.py index 2b6f88f..4a39e40 100644 --- a/airflow/contrib/operators/hive_to_dynamodb.py +++ b/airflow/contrib/operators/hive_to_dynamodb.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/jenkins_job_trigger_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/jenkins_job_trigger_operator.py b/airflow/contrib/operators/jenkins_job_trigger_operator.py index ad59797..37185f3 100644 --- a/airflow/contrib/operators/jenkins_job_trigger_operator.py +++ b/airflow/contrib/operators/jenkins_job_trigger_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/jira_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/jira_operator.py b/airflow/contrib/operators/jira_operator.py index 64869e4..01d78b8 100644 --- a/airflow/contrib/operators/jira_operator.py +++ b/airflow/contrib/operators/jira_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 391ec58..8e88b68 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -29,6 +29,7 @@ template_fields = ('templates_dict',) template_ext = tuple() ui_color = '#ffefeb' + class KubernetesPodOperator(BaseOperator): """ Execute a task in a Kubernetes Pod http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/mlengine_operator_utils.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mlengine_operator_utils.py b/airflow/contrib/operators/mlengine_operator_utils.py index 5fda6ae..7ce784e 100644 --- a/airflow/contrib/operators/mlengine_operator_utils.py +++ b/airflow/contrib/operators/mlengine_operator_utils.py @@ -28,6 +28,7 @@ from airflow.exceptions import AirflowException from airflow.operators.python_operator import PythonOperator from six.moves.urllib.parse import urlsplit + def create_evaluate_ops(task_prefix, data_format, input_paths, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/postgres_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/postgres_to_gcs_operator.py b/airflow/contrib/operators/postgres_to_gcs_operator.py index 9eac70d..88b4d00 100644 --- a/airflow/contrib/operators/postgres_to_gcs_operator.py +++ b/airflow/contrib/operators/postgres_to_gcs_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/pubsub_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/pubsub_operator.py b/airflow/contrib/operators/pubsub_operator.py index 42586aa..2d55b19 100644 --- a/airflow/contrib/operators/pubsub_operator.py +++ b/airflow/contrib/operators/pubsub_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py index 47ccf53..88e8bdf 100755 --- a/airflow/contrib/operators/qubole_operator.py +++ b/airflow/contrib/operators/qubole_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/s3_list_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/s3_list_operator.py b/airflow/contrib/operators/s3_list_operator.py index df448ad..b85691b 100644 --- a/airflow/contrib/operators/s3_list_operator.py +++ b/airflow/contrib/operators/s3_list_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/sftp_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py index 519dc1d..153f440 100644 --- a/airflow/contrib/operators/sftp_operator.py +++ b/airflow/contrib/operators/sftp_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -65,7 +65,8 @@ class SFTPOperator(BaseOperator): self.local_filepath = local_filepath self.remote_filepath = remote_filepath self.operation = operation - if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT): + if not (self.operation.lower() == SFTPOperation.GET or + self.operation.lower() == SFTPOperation.PUT): raise TypeError("unsupported operation value {0}, expected {1} or {2}" .format(self.operation, SFTPOperation.GET, SFTPOperation.PUT)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/slack_webhook_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/slack_webhook_operator.py b/airflow/contrib/operators/slack_webhook_operator.py index 124b249..bab3e90 100644 --- a/airflow/contrib/operators/slack_webhook_operator.py +++ b/airflow/contrib/operators/slack_webhook_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/snowflake_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/snowflake_operator.py b/airflow/contrib/operators/snowflake_operator.py index 25e9bf4..39d7d49 100644 --- a/airflow/contrib/operators/snowflake_operator.py +++ b/airflow/contrib/operators/snowflake_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/spark_jdbc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_jdbc_operator.py b/airflow/contrib/operators/spark_jdbc_operator.py index c915aaa..42f9dd5 100644 --- a/airflow/contrib/operators/spark_jdbc_operator.py +++ b/airflow/contrib/operators/spark_jdbc_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/spark_sql_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py index 864c137..825a367 100644 --- a/airflow/contrib/operators/spark_sql_operator.py +++ b/airflow/contrib/operators/spark_sql_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,9 +32,11 @@ class SparkSqlOperator(BaseOperator): :type conf: str (format: PROP=VALUE) :param conn_id: connection_id string :type conn_id: str - :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) + :param total_executor_cores: (Standalone & Mesos only) Total cores for all + executors (Default: all the available cores on the worker) :type total_executor_cores: int - :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) + :param executor_cores: (Standalone & YARN only) Number of cores per + executor (Default: 2) :type executor_cores: int :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :type executor_memory: str http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/sqoop_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py index 05996d8..fa61ca1 100644 --- a/airflow/contrib/operators/sqoop_operator.py +++ b/airflow/contrib/operators/sqoop_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,12 +33,16 @@ from airflow.utils.decorators import apply_defaults class SqoopOperator(BaseOperator): """ Execute a Sqoop job. - Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html. + Documentation for Apache Sqoop can be found here: + https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html. """ - template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type', 'columns', 'split_by', - 'where', 'export_dir', 'input_null_string', 'input_null_non_string', 'staging_table', - 'enclosed_by', 'escaped_by', 'input_fields_terminated_by', 'input_lines_terminated_by', - 'input_optionally_enclosed_by', 'properties', 'extra_import_options', 'driver', + template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir', + 'file_type', 'columns', 'split_by', + 'where', 'export_dir', 'input_null_string', + 'input_null_non_string', 'staging_table', + 'enclosed_by', 'escaped_by', 'input_fields_terminated_by', + 'input_lines_terminated_by', 'input_optionally_enclosed_by', + 'properties', 'extra_import_options', 'driver', 'extra_export_options', 'hcatalog_database', 'hcatalog_table',) ui_color = '#7D8CA4' @@ -115,7 +119,8 @@ class SqoopOperator(BaseOperator): :param relaxed_isolation: use read uncommitted isolation level :param hcatalog_database: Specifies the database name for the HCatalog table :param hcatalog_table: The argument value for this option is the HCatalog table - :param create_hcatalog_table: Have sqoop create the hcatalog table passed in or not + :param create_hcatalog_table: Have sqoop create the hcatalog table passed + in or not :param properties: additional JVM properties passed to sqoop :param extra_import_options: Extra import options to pass as dict. If a key doesn't have a value, just pass an empty string to it. @@ -189,7 +194,8 @@ class SqoopOperator(BaseOperator): extra_export_options=self.extra_export_options) elif self.cmd_type == 'import': # add create hcatalog table to extra import options if option passed - # if new params are added to constructor can pass them in here so don't modify sqoop_hook for each param + # if new params are added to constructor can pass them in here + # so don't modify sqoop_hook for each param if self.create_hcatalog_table: self.extra_import_options['create-hcatalog-table'] = '' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/ssh_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py index b5ab2dc..d246800 100644 --- a/airflow/contrib/operators/ssh_operator.py +++ b/airflow/contrib/operators/ssh_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -109,7 +109,9 @@ class SSHOperator(BaseOperator): agg_stdout += stdout.channel.recv(stdout_buffer_length) # read from both stdout and stderr - while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): + while not channel.closed or \ + channel.recv_ready() or \ + channel.recv_stderr_ready(): readq, _, _ = select([channel], [], [], self.timeout) for c in readq: if c.recv_ready(): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py index c2a6efb..41072ff 100644 --- a/airflow/contrib/operators/vertica_operator.py +++ b/airflow/contrib/operators/vertica_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py index 40166fa..00fdb64 100644 --- a/airflow/contrib/operators/vertica_to_hive.py +++ b/airflow/contrib/operators/vertica_to_hive.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -94,7 +94,9 @@ class VerticaToHiveTransfer(BaseOperator): @classmethod def type_map(cls, vertica_type): # vertica-python datatype.py donot provied the full type mapping access. - # Manual hack. Reference: https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py + # Manual hack. + # Reference: + # https://github.com/uber/vertica-python/blob/master/vertica_python/vertica/column.py d = { 5: 'BOOLEAN', 6: 'INT', @@ -120,7 +122,8 @@ class VerticaToHiveTransfer(BaseOperator): for field in cursor.description: col_count += 1 col_position = "Column{position}".format(position=col_count) - field_dict[col_position if field[0] == '' else field[0]] = self.type_map(field[1]) + field_dict[col_position if field[0] == '' else field[0]] = \ + self.type_map(field[1]) csv_writer.writerows(cursor.iterate()) f.flush() cursor.close() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/operators/vertica_to_mysql.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/vertica_to_mysql.py b/airflow/contrib/operators/vertica_to_mysql.py index 22af0ce..9d3fe72 100644 --- a/airflow/contrib/operators/vertica_to_mysql.py +++ b/airflow/contrib/operators/vertica_to_mysql.py @@ -18,6 +18,7 @@ # under the License. import logging +import MySQLdb from airflow.contrib.hooks.vertica_hook import VerticaHook from airflow.hooks.mysql_hook import MySqlHook @@ -60,7 +61,7 @@ class VerticaToMySqlTransfer(BaseOperator): """ template_fields = ('sql', 'mysql_table', 'mysql_preoperator', - 'mysql_postoperator') + 'mysql_postoperator') template_ext = ('.sql',) ui_color = '#a0e08c' @@ -102,7 +103,9 @@ class VerticaToMySqlTransfer(BaseOperator): if self.bulk_load: tmpfile = NamedTemporaryFile("w") - logging.info("Selecting rows from Vertica to local file " + str(tmpfile.name) + "...") + logging.info( + "Selecting rows from Vertica to local file " + str( + tmpfile.name) + "...") logging.info(self.sql) csv_writer = csv.writer(tmpfile, delimiter='\t', encoding='utf-8') @@ -129,14 +132,20 @@ class VerticaToMySqlTransfer(BaseOperator): logging.info("Bulk inserting rows into MySQL...") with closing(mysql.get_conn()) as conn: with closing(conn.cursor()) as cursor: - cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO TABLE %s LINES TERMINATED BY '\r\n' (%s)" % (tmpfile.name, self.mysql_table, ", ".join(selected_columns))) + cursor.execute("LOAD DATA LOCAL INFILE '%s' INTO " + "TABLE %s LINES TERMINATED BY '\r\n' (%s)" % + (tmpfile.name, + self.mysql_table, + ", ".join(selected_columns))) conn.commit() tmpfile.close() else: logging.info("Inserting rows into MySQL...") - mysql.insert_rows(table=self.mysql_table, rows=result, target_fields=selected_columns) + mysql.insert_rows(table=self.mysql_table, + rows=result, + target_fields=selected_columns) logging.info("Inserted rows into MySQL " + str(count)) - except: + except (MySQLdb.Error, MySQLdb.Warning): logging.error("Inserted rows into MySQL 0") raise http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/plugins/metastore_browser/main.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/plugins/metastore_browser/main.py b/airflow/contrib/plugins/metastore_browser/main.py index 9a47ef5..836e531 100644 --- a/airflow/contrib/plugins/metastore_browser/main.py +++ b/airflow/contrib/plugins/metastore_browser/main.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -138,7 +138,7 @@ class MetastoreBrowserView(BaseView, wwwutils.DataProfilingMixin): """.format(where_clause=where_clause, LIMIT=TABLE_SELECTOR_LIMIT) h = MySqlHook(METASTORE_MYSQL_CONN_ID) d = [ - {'id': row[0], 'text': row[0]} + {'id': row[0], 'text': row[0]} for row in h.get_records(sql)] return json.dumps(d) @@ -161,6 +161,7 @@ class MetastoreBrowserView(BaseView, wwwutils.DataProfilingMixin): h = HiveCliHook(HIVE_CLI_CONN_ID) return h.run_cli(sql) + v = MetastoreBrowserView(category="Plugins", name="Hive Metadata Browser") # Creating a flask blueprint to intergrate the templates and static folder http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/__init__.py b/airflow/contrib/sensors/__init__.py index 4067cc7..114d189 100644 --- a/airflow/contrib/sensors/__init__.py +++ b/airflow/contrib/sensors/__init__.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/aws_redshift_cluster_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py index 3b130af..6709524 100644 --- a/airflow/contrib/sensors/aws_redshift_cluster_sensor.py +++ b/airflow/contrib/sensors/aws_redshift_cluster_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/bash_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/bash_sensor.py b/airflow/contrib/sensors/bash_sensor.py index 965c13c..26fbb06 100644 --- a/airflow/contrib/sensors/bash_sensor.py +++ b/airflow/contrib/sensors/bash_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -66,7 +66,6 @@ class BashSensor(BaseSensorOperator): self.log.info("Tmp dir root location: \n %s", gettempdir()) with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: - f.write(bytes(bash_command, 'utf_8')) f.flush() fname = f.name http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/bigquery_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py index fe75548..2e496f6 100644 --- a/airflow/contrib/sensors/bigquery_sensor.py +++ b/airflow/contrib/sensors/bigquery_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,37 +24,36 @@ from airflow.utils.decorators import apply_defaults class BigQueryTableSensor(BaseSensorOperator): """ Checks for the existence of a table in Google Bigquery. - """ - template_fields = ('project_id', 'dataset_id', 'table_id',) - ui_color = '#f0eee4' - - @apply_defaults - def __init__( - self, - project_id, - dataset_id, - table_id, - bigquery_conn_id='bigquery_default_conn', - delegate_to=None, - *args, - **kwargs): - """ - Create a new BigQueryTableSensor. - :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook - must provide access to the specified project. + :param project_id: The Google cloud project in which to look for the table. + The connection supplied to the hook must provide + access to the specified project. :type project_id: string :param dataset_id: The name of the dataset in which to look for the table. storage bucket. :type dataset_id: string :param table_id: The name of the table to check the existence of. :type table_id: string - :param bigquery_conn_id: The connection ID to use when connecting to Google BigQuery. + :param bigquery_conn_id: The connection ID to use when connecting to + Google BigQuery. :type bigquery_conn_id: string :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide delegation enabled. + For this to work, the service account making the request must + have domain-wide delegation enabled. :type delegate_to: string - """ + """ + template_fields = ('project_id', 'dataset_id', 'table_id',) + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + project_id, + dataset_id, + table_id, + bigquery_conn_id='bigquery_default_conn', + delegate_to=None, + *args, **kwargs): + super(BigQueryTableSensor, self).__init__(*args, **kwargs) self.project_id = project_id self.dataset_id = dataset_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/datadog_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py index 9dad505..fb455d6 100644 --- a/airflow/contrib/sensors/datadog_sensor.py +++ b/airflow/contrib/sensors/datadog_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_base_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py index edb420e..abeb421 100644 --- a/airflow/contrib/sensors/emr_base_sensor.py +++ b/airflow/contrib/sensors/emr_base_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_job_flow_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py index c33dbad..31d16a0 100644 --- a/airflow/contrib/sensors/emr_job_flow_sensor.py +++ b/airflow/contrib/sensors/emr_job_flow_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,7 +30,8 @@ class EmrJobFlowSensor(EmrBaseSensor): :type job_flow_id: string """ - NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING'] + NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', + 'WAITING', 'TERMINATING'] FAILED_STATE = ['TERMINATED_WITH_ERRORS'] template_fields = ['job_flow_id'] template_ext = () http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/emr_step_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py index bd76969..3dddf01 100644 --- a/airflow/contrib/sensors/emr_step_sensor.py +++ b/airflow/contrib/sensors/emr_step_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/file_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/file_sensor.py b/airflow/contrib/sensors/file_sensor.py index 2e3a3fd..3f7bb24 100644 --- a/airflow/contrib/sensors/file_sensor.py +++ b/airflow/contrib/sensors/file_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/ftp_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py index efdedd7..f0a4928 100644 --- a/airflow/contrib/sensors/ftp_sensor.py +++ b/airflow/contrib/sensors/ftp_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/gcs_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py index 036d4f0..23cd760 100644 --- a/airflow/contrib/sensors/gcs_sensor.py +++ b/airflow/contrib/sensors/gcs_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,21 +24,7 @@ from airflow.utils.decorators import apply_defaults class GoogleCloudStorageObjectSensor(BaseSensorOperator): """ Checks for the existence of a file in Google Cloud Storage. - """ - template_fields = ('bucket', 'object') - ui_color = '#f0eee4' - - @apply_defaults - def __init__( - self, - bucket, - object, # pylint:disable=redefined-builtin - google_cloud_conn_id='google_cloud_default', - delegate_to=None, - *args, - **kwargs): - """ - Create a new GoogleCloudStorageObjectSensor. + Create a new GoogleCloudStorageObjectSensor. :param bucket: The Google cloud storage bucket where the object is. :type bucket: string @@ -49,9 +35,21 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator): connecting to Google cloud storage. :type google_cloud_storage_conn_id: string :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide delegation enabled. + For this to work, the service account making the request must have + domain-wide delegation enabled. :type delegate_to: string - """ + """ + template_fields = ('bucket', 'object') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + bucket, + object, # pylint:disable=redefined-builtin + google_cloud_conn_id='google_cloud_default', + delegate_to=None, + *args, **kwargs): + super(GoogleCloudStorageObjectSensor, self).__init__(*args, **kwargs) self.bucket = bucket self.object = object @@ -78,23 +76,7 @@ def ts_function(context): class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator): """ Checks if an object is updated in Google Cloud Storage. - """ - template_fields = ('bucket', 'object') - template_ext = ('.sql',) - ui_color = '#f0eee4' - - @apply_defaults - def __init__( - self, - bucket, - object, # pylint:disable=redefined-builtin - ts_func=ts_function, - google_cloud_conn_id='google_cloud_default', - delegate_to=None, - *args, - **kwargs): - """ - Create a new GoogleCloudStorageObjectUpdatedSensor. + Create a new GoogleCloudStorageObjectUpdatedSensor. :param bucket: The Google cloud storage bucket where the object is. :type bucket: string @@ -112,7 +94,20 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string - """ + """ + template_fields = ('bucket', 'object') + template_ext = ('.sql',) + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + bucket, + object, # pylint:disable=redefined-builtin + ts_func=ts_function, + google_cloud_conn_id='google_cloud_default', + delegate_to=None, + *args, **kwargs): + super(GoogleCloudStorageObjectUpdatedSensor, self).__init__(*args, **kwargs) self.bucket = bucket self.object = object @@ -131,21 +126,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator): class GoogleCloudStoragePrefixSensor(BaseSensorOperator): """ Checks for the existence of a files at prefix in Google Cloud Storage bucket. - """ - template_fields = ('bucket', 'prefix') - ui_color = '#f0eee4' - - @apply_defaults - def __init__( - self, - bucket, - prefix, - google_cloud_conn_id='google_cloud_default', - delegate_to=None, - *args, - **kwargs): - """ - Create a new GoogleCloudStorageObjectSensor. + Create a new GoogleCloudStorageObjectSensor. :param bucket: The Google cloud storage bucket where the object is. :type bucket: string @@ -156,9 +137,20 @@ class GoogleCloudStoragePrefixSensor(BaseSensorOperator): connecting to Google cloud storage. :type google_cloud_storage_conn_id: string :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide delegation enabled. + For this to work, the service account making the request must have + domain-wide delegation enabled. :type delegate_to: string - """ + """ + template_fields = ('bucket', 'prefix') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + bucket, + prefix, + google_cloud_conn_id='google_cloud_default', + delegate_to=None, + *args, **kwargs): super(GoogleCloudStoragePrefixSensor, self).__init__(*args, **kwargs) self.bucket = bucket self.prefix = prefix @@ -166,7 +158,8 @@ class GoogleCloudStoragePrefixSensor(BaseSensorOperator): self.delegate_to = delegate_to def poke(self, context): - self.log.info('Sensor checks existence of objects: %s, %s', self.bucket, self.prefix) + self.log.info('Sensor checks existence of objects: %s, %s', + self.bucket, self.prefix) hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/hdfs_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/hdfs_sensor.py b/airflow/contrib/sensors/hdfs_sensor.py index 5193add..832b81b 100644 --- a/airflow/contrib/sensors/hdfs_sensor.py +++ b/airflow/contrib/sensors/hdfs_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/jira_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py index 6f73d19..0583c22 100644 --- a/airflow/contrib/sensors/jira_sensor.py +++ b/airflow/contrib/sensors/jira_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -136,12 +136,16 @@ class JiraTicketSensor(JiraSensor): ) except JIRAError as jira_error: - self.log.error("Jira error while checking with expected value: %s", jira_error) + self.log.error("Jira error while checking with expected value: %s", + jira_error) except Exception as e: - self.log.error("Error while checking with expected value %s:", self.expected_value) + self.log.error("Error while checking with expected value %s:", + self.expected_value) self.log.exception(e) if result is True: - self.log.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value) + self.log.info("Issue field %s has expected value %s, returning success", + self.field, self.expected_value) else: - self.log.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value) + self.log.info("Issue field %s don't have expected value %s yet.", + self.field, self.expected_value) return result http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/pubsub_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/pubsub_sensor.py b/airflow/contrib/sensors/pubsub_sensor.py index c6b8467..7d17215 100644 --- a/airflow/contrib/sensors/pubsub_sensor.py +++ b/airflow/contrib/sensors/pubsub_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/redis_key_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py index 0ecd34a..baf3e16 100644 --- a/airflow/contrib/sensors/redis_key_sensor.py +++ b/airflow/contrib/sensors/redis_key_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/sftp_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/sftp_sensor.py b/airflow/contrib/sensors/sftp_sensor.py index 08559e9..801943a 100644 --- a/airflow/contrib/sensors/sftp_sensor.py +++ b/airflow/contrib/sensors/sftp_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/sensors/wasb_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py index c24f626..ec6a63b 100644 --- a/airflow/contrib/sensors/wasb_sensor.py +++ b/airflow/contrib/sensors/wasb_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06b62c42/airflow/contrib/task_runner/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/task_runner/__init__.py b/airflow/contrib/task_runner/__init__.py index 4067cc7..114d189 100644 --- a/airflow/contrib/task_runner/__init__.py +++ b/airflow/contrib/task_runner/__init__.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
