http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/salesforce_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py index 67d1605..f2b5fef 100644 --- a/airflow/contrib/hooks/salesforce_hook.py +++ b/airflow/contrib/hooks/salesforce_hook.py @@ -24,14 +24,15 @@ NOTE: this hook also relies on the simple_salesforce package: from simple_salesforce import Salesforce from airflow.hooks.base_hook import BaseHook -import logging import json import pandas as pd import time +from airflow.utils.log.LoggingMixin import LoggingMixin -class SalesforceHook(BaseHook): + +class SalesforceHook(BaseHook, LoggingMixin): def __init__( self, conn_id, @@ -91,13 +92,12 @@ class SalesforceHook(BaseHook): """ self.sign_in() - logging.info("Querying for all objects") + self.logger.info("Querying for all objects") query = self.sf.query_all(query) - logging.info( - "Received results: Total size: {0}; Done: {1}".format( - query['totalSize'], query['done'] - ) + self.logger.info( + "Received results: Total size: %s; Done: %s", + query['totalSize'], query['done'] ) query = json.loads(json.dumps(query)) @@ -144,11 +144,9 @@ class SalesforceHook(BaseHook): field_string = self._build_field_list(fields) query = "SELECT {0} FROM {1}".format(field_string, obj) - logging.info( - "Making query to salesforce: {0}".format( - query if len(query) < 30 - else " ... ".join([query[:15], query[-15:]]) - ) + self.logger.info( + "Making query to Salesforce: %s", + query if len(query) < 30 else " ... ".join([query[:15], query[-15:]]) ) return self.make_query(query) @@ -171,8 +169,9 @@ class SalesforceHook(BaseHook): try: col = pd.to_datetime(col) except ValueError: - logging.warning( - "Could not convert field to timestamps: {0}".format(col.name) + log = LoggingMixin().logger + log.warning( + "Could not convert field to timestamps: %s", col.name ) return col @@ -266,7 +265,7 @@ class SalesforceHook(BaseHook): # for each returned record object_name = query_results[0]['attributes']['type'] - logging.info("Coercing timestamps for: {0}".format(object_name)) + self.logger.info("Coercing timestamps for: %s", object_name) schema = self.describe_object(object_name) @@ -300,7 +299,7 @@ class SalesforceHook(BaseHook): # there are also a ton of newline objects # that mess up our ability to write to csv # we remove these newlines so that the output is a valid CSV format - logging.info("Cleaning data and writing to CSV") + self.logger.info("Cleaning data and writing to CSV") possible_strings = df.columns[df.dtypes == "object"] df[possible_strings] = df[possible_strings].apply( lambda x: x.str.replace("\r\n", "")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/spark_sql_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py index d7bef7b..aa16130 100644 --- a/airflow/contrib/hooks/spark_sql_hook.py +++ b/airflow/contrib/hooks/spark_sql_hook.py @@ -12,16 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import logging import subprocess from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException +from airflow.utils.log.LoggingMixin import LoggingMixin -log = logging.getLogger(__name__) - -class SparkSqlHook(BaseHook): +class SparkSqlHook(BaseHook, LoggingMixin): """ This hook is a wrapper around the spark-sql binary. It requires that the "spark-sql" binary is in the PATH. @@ -123,7 +121,7 @@ class SparkSqlHook(BaseHook): connection_cmd += ["--queue", self._yarn_queue] connection_cmd += cmd - logging.debug("Spark-Sql cmd: {}".format(connection_cmd)) + self.logger.debug("Spark-Sql cmd: %s", connection_cmd) return connection_cmd @@ -153,5 +151,5 @@ class SparkSqlHook(BaseHook): def kill(self): if self._sp and self._sp.poll() is None: - logging.info("Killing the Spark-Sql job") + self.logger.info("Killing the Spark-Sql job") self._sp.kill() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index a667753..bdd1efe 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -12,18 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import logging import os import subprocess import re from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException +from airflow.utils.log.LoggingMixin import LoggingMixin -log = logging.getLogger(__name__) - -class SparkSubmitHook(BaseHook): +class SparkSubmitHook(BaseHook, LoggingMixin): """ This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. It requires that the "spark-submit" binary is in the PATH or the spark_home to be @@ -63,7 +61,6 @@ class SparkSubmitHook(BaseHook): :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :type verbose: bool """ - def __init__(self, conf=None, conn_id='spark_default', @@ -126,10 +123,9 @@ class SparkSubmitHook(BaseHook): conn_data['spark_home'] = extra.get('spark-home', None) conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit') except AirflowException: - logging.debug( - "Could not load connection string {}, defaulting to {}".format( - self._conn_id, conn_data['master'] - ) + self.logger.debug( + "Could not load connection string %s, defaulting to %s", + self._conn_id, conn_data['master'] ) return conn_data @@ -196,7 +192,7 @@ class SparkSubmitHook(BaseHook): if self._application_args: connection_cmd += self._application_args - logging.debug("Spark-Submit cmd: {}".format(connection_cmd)) + self.logger.debug("Spark-Submit cmd: %s", connection_cmd) return connection_cmd @@ -243,15 +239,15 @@ class SparkSubmitHook(BaseHook): self._yarn_application_id = match.groups()[0] # Pass to logging - logging.info(line) + self.logger.info(line) def on_kill(self): if self._sp and self._sp.poll() is None: - logging.info('Sending kill signal to {}'.format(self._connection['spark_binary'])) + self.logger.info('Sending kill signal to %s', self._connection['spark_binary']) self._sp.kill() if self._yarn_application_id: - logging.info('Killing application on YARN') + self.logger.info('Killing application on YARN') kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split() yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - logging.info("YARN killed with return code: {0}".format(yarn_kill.wait())) + self.logger.info("YARN killed with return code: %s", yarn_kill.wait()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/sqoop_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py index 7fbb6c5..0584df4 100644 --- a/airflow/contrib/hooks/sqoop_hook.py +++ b/airflow/contrib/hooks/sqoop_hook.py @@ -16,17 +16,14 @@ """ This module contains a sqoop 1.x hook """ - -import logging import subprocess from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook - -log = logging.getLogger(__name__) +from airflow.utils.log.LoggingMixin import LoggingMixin -class SqoopHook(BaseHook): +class SqoopHook(BaseHook, LoggingMixin): """ This hook is a wrapper around the sqoop 1 binary. To be able to use the hook it is required that "sqoop" is in the PATH. @@ -79,7 +76,7 @@ class SqoopHook(BaseHook): password_index = cmd.index('--password') cmd[password_index + 1] = 'MASKED' except ValueError: - logging.debug("No password in sqoop cmd") + self.logger.debug("No password in sqoop cmd") return cmd def Popen(self, cmd, **kwargs): @@ -90,21 +87,21 @@ class SqoopHook(BaseHook): :param kwargs: extra arguments to Popen (see subprocess.Popen) :return: handle to subprocess """ - logging.info("Executing command: {}".format(' '.join(cmd))) + self.logger.info("Executing command: %s", ' '.join(cmd)) sp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs) for line in iter(sp.stdout): - logging.info(line.strip()) + self.logger.info(line.strip()) sp.wait() - logging.info("Command exited with return code {0}".format(sp.returncode)) + self.logger.info("Command exited with return code %s", sp.returncode) if sp.returncode: - raise AirflowException("Sqoop command failed: {}".format(' '.join(cmd))) + raise AirflowException("Sqoop command failed: %s", ' '.join(cmd)) def _prepare_command(self, export=False): if export: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/ssh_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py index f1e25a6..3fe9146 100755 --- a/airflow/contrib/hooks/ssh_hook.py +++ b/airflow/contrib/hooks/ssh_hook.py @@ -16,7 +16,6 @@ # limitations under the License. import getpass -import logging import os import paramiko @@ -24,9 +23,10 @@ import paramiko from contextlib import contextmanager from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin -class SSHHook(BaseHook): +class SSHHook(BaseHook, LoggingMixin): """ Hook for ssh remote execution using Paramiko. ref: https://github.com/paramiko/paramiko @@ -70,7 +70,7 @@ class SSHHook(BaseHook): def get_conn(self): if not self.client: - logging.debug('creating ssh client for conn_id: {0}'.format(self.ssh_conn_id)) + self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id) if self.ssh_conn_id is not None: conn = self.get_connection(self.ssh_conn_id) if self.username is None: @@ -98,9 +98,11 @@ class SSHHook(BaseHook): # Auto detecting username values from system if not self.username: - logging.debug("username to ssh to host: {0} is not specified, using " - "system's default provided by getpass.getuser()" - .format(self.remote_host, self.ssh_conn_id)) + self.logger.debug( + "username to ssh to host: %s is not specified for connection id" + " %s. Using system's default provided by getpass.getuser()", + self.remote_host, self.ssh_conn_id + ) self.username = getpass.getuser() host_proxy = None @@ -140,14 +142,20 @@ class SSHHook(BaseHook): self.client = client except paramiko.AuthenticationException as auth_error: - logging.error("Auth failed while connecting to host: {0}, error: {1}" - .format(self.remote_host, auth_error)) + self.logger.error( + "Auth failed while connecting to host: %s, error: %s", + self.remote_host, auth_error + ) except paramiko.SSHException as ssh_error: - logging.error("Failed connecting to host: {0}, error: {1}" - .format(self.remote_host, ssh_error)) + self.logger.error( + "Failed connecting to host: %s, error: %s", + self.remote_host, ssh_error + ) except Exception as error: - logging.error("Error connecting to host: {0}, error: {1}" - .format(self.remote_host, error)) + self.logger.error( + "Error connecting to host: %s, error: %s", + self.remote_host, error + ) return self.client @contextmanager @@ -183,7 +191,7 @@ class SSHHook(BaseHook): ] ssh_cmd += ssh_tunnel_cmd - logging.debug("creating tunnel with cmd: {0}".format(ssh_cmd)) + self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd) proc = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 3b804a8..37e4a97 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -89,7 +87,7 @@ class BigQueryOperator(BaseOperator): self.query_params = query_params def execute(self, context): - logging.info('Executing: %s', self.bql) + self.logger.info('Executing: %s', self.bql) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_table_delete_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py index 643f5ac..21de7cc 100644 --- a/airflow/contrib/operators/bigquery_table_delete_operator.py +++ b/airflow/contrib/operators/bigquery_table_delete_operator.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -55,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator): self.ignore_if_missing = ignore_if_missing def execute(self, context): - logging.info('Deleting: %s', self.deletion_dataset_table) + self.logger.info('Deleting: %s', self.deletion_dataset_table) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_to_bigquery.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py index 6f4843c..8e21270 100644 --- a/airflow/contrib/operators/bigquery_to_bigquery.py +++ b/airflow/contrib/operators/bigquery_to_bigquery.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -70,8 +68,10 @@ class BigQueryToBigQueryOperator(BaseOperator): self.delegate_to = delegate_to def execute(self, context): - logging.info('Executing copy of %s into: %s', self.source_project_dataset_tables, - self.destination_project_dataset_table) + self.logger.info( + 'Executing copy of %s into: %s', + self.source_project_dataset_tables, self.destination_project_dataset_table + ) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py index aaff462..23a2029 100644 --- a/airflow/contrib/operators/bigquery_to_gcs.py +++ b/airflow/contrib/operators/bigquery_to_gcs.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -81,7 +79,7 @@ class BigQueryToCloudStorageOperator(BaseOperator): self.delegate_to = delegate_to def execute(self, context): - logging.info('Executing extract of %s into: %s', + self.logger.info('Executing extract of %s into: %s', self.source_project_dataset_table, self.destination_cloud_storage_uris) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/databricks_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py index 1aa1441..8773357 100644 --- a/airflow/contrib/operators/databricks_operator.py +++ b/airflow/contrib/operators/databricks_operator.py @@ -13,7 +13,6 @@ # limitations under the License. # -import logging import six import time @@ -21,8 +20,6 @@ from airflow.exceptions import AirflowException from airflow.contrib.hooks.databricks_hook import DatabricksHook from airflow.models import BaseOperator -LINE_BREAK = ('-' * 80) - class DatabricksSubmitRunOperator(BaseOperator): """ @@ -217,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator): raise AirflowException(msg) def _log_run_page_url(self, url): - logging.info('View run status, Spark UI, and logs at {}'.format(url)) + self.logger.info('View run status, Spark UI, and logs at %s', url) def get_hook(self): return DatabricksHook( @@ -228,16 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator): hook = self.get_hook() self.run_id = hook.submit_run(self.json) run_page_url = hook.get_run_page_url(self.run_id) - logging.info(LINE_BREAK) - logging.info('Run submitted with run_id: {}'.format(self.run_id)) + self.logger.info('Run submitted with run_id: %s', self.run_id) self._log_run_page_url(run_page_url) - logging.info(LINE_BREAK) while True: run_state = hook.get_run_state(self.run_id) if run_state.is_terminal: if run_state.is_successful: - logging.info('{} completed successfully.'.format( - self.task_id)) + self.logger.info('%s completed successfully.', self.task_id) self._log_run_page_url(run_page_url) return else: @@ -246,16 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator): s=run_state) raise AirflowException(error_message) else: - logging.info('{t} in run state: {s}'.format(t=self.task_id, - s=run_state)) + self.logger.info('%s in run state: %s', self.task_id, run_state) self._log_run_page_url(run_page_url) - logging.info('Sleeping for {} seconds.'.format( - self.polling_period_seconds)) + self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds) time.sleep(self.polling_period_seconds) def on_kill(self): hook = self.get_hook() hook.cancel_run(self.run_id) - logging.info('Task: {t} with run_id: {r} was requested to be cancelled.'.format( - t=self.task_id, - r=self.run_id)) + self.logger.info( + 'Task: %s with run_id: %s was requested to be cancelled.', + self.task_id, self.run_id + ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index c0ff6a7..3c22b60 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -13,7 +13,6 @@ # limitations under the License. # -import logging import time from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook @@ -178,13 +177,14 @@ class DataprocClusterCreateOperator(BaseOperator): while True: state = self._get_cluster_state(service) if state is None: - logging.info("No state for cluster '%s'", self.cluster_name) + self.logger.info("No state for cluster '%s'", self.cluster_name) time.sleep(15) else: - logging.info("State for cluster '%s' is %s", self.cluster_name, state) + self.logger.info("State for cluster '%s' is %s", self.cluster_name, state) if self._cluster_ready(state, service): - logging.info("Cluster '%s' successfully created", - self.cluster_name) + self.logger.info( + "Cluster '%s' successfully created", self.cluster_name + ) return time.sleep(15) @@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator): return cluster_data def execute(self, context): - logging.info('Creating cluster: {}'.format(self.cluster_name)) + self.logger.info('Creating cluster: %s', self.cluster_name) hook = DataProcHook( gcp_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to @@ -272,9 +272,10 @@ class DataprocClusterCreateOperator(BaseOperator): service = hook.get_conn() if self._get_cluster(service): - logging.info('Cluster {} already exists... Checking status...'.format( - self.cluster_name - )) + self.logger.info( + 'Cluster %s already exists... Checking status...', + self.cluster_name + ) self._wait_for_done(service) return True @@ -289,9 +290,10 @@ class DataprocClusterCreateOperator(BaseOperator): # probably two cluster start commands at the same time time.sleep(10) if self._get_cluster(service): - logging.info('Cluster {} already exists... Checking status...'.format( - self.cluster_name - )) + self.logger.info( + 'Cluster {} already exists... Checking status...', + self.cluster_name + ) self._wait_for_done(service) return True else: @@ -356,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator): time.sleep(15) def execute(self, context): - logging.info('Deleting cluster: {}'.format(self.cluster_name)) + self.logger.info('Deleting cluster: %s', self.cluster_name) hook = DataProcHook( gcp_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to @@ -369,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator): clusterName=self.cluster_name ).execute() operation_name = response['name'] - logging.info("Cluster delete operation name: {}".format(operation_name)) + self.logger.info("Cluster delete operation name: %s", operation_name) self._wait_for_done(service, operation_name) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/datastore_export_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py index 1980dfe..76415e1 100644 --- a/airflow/contrib/operators/datastore_export_operator.py +++ b/airflow/contrib/operators/datastore_export_operator.py @@ -12,13 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import logging from airflow.contrib.hooks.datastore_hook import DatastoreHook from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults + class DatastoreExportOperator(BaseOperator): """ Export entities from Google Cloud Datastore to Cloud Storage @@ -79,7 +78,7 @@ class DatastoreExportOperator(BaseOperator): self.xcom_push = xcom_push def execute(self, context): - logging.info('Exporting data to Cloud Storage bucket ' + self.bucket) + self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket) if self.overwrite_existing and self.namespace: gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/datastore_import_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py index 3427ba5..74bd940 100644 --- a/airflow/contrib/operators/datastore_import_operator.py +++ b/airflow/contrib/operators/datastore_import_operator.py @@ -12,13 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import logging - from airflow.contrib.hooks.datastore_hook import DatastoreHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults + class DatastoreImportOperator(BaseOperator): """ Import entities from Cloud Storage to Google Cloud Datastore @@ -74,7 +72,7 @@ class DatastoreImportOperator(BaseOperator): self.xcom_push = xcom_push def execute(self, context): - logging.info('Importing data from Cloud Storage bucket ' + self.bucket) + self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket) ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to) result = ds_hook.import_from_storage_bucket(bucket=self.bucket, file=self.file, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/ecs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py index 11f8c94..0c75eaa 100644 --- a/airflow/contrib/operators/ecs_operator.py +++ b/airflow/contrib/operators/ecs_operator.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import sys -import logging from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -57,12 +56,11 @@ class ECSOperator(BaseOperator): self.hook = self.get_hook() def execute(self, context): - - logging.info('Running ECS Task - Task definition: {} - on cluster {}'.format( - self.task_definition, - self.cluster - )) - logging.info('ECSOperator overrides: {}'.format(self.overrides)) + self.logger.info( + 'Running ECS Task - Task definition: %s - on cluster %s', + self.task_definition,self.cluster + ) + self.logger.info('ECSOperator overrides: %s', self.overrides) self.client = self.hook.get_client_type( 'ecs', @@ -77,15 +75,15 @@ class ECSOperator(BaseOperator): ) failures = response['failures'] - if (len(failures) > 0): + if len(failures) > 0: raise AirflowException(response) - logging.info('ECS Task started: {}'.format(response)) + self.logger.info('ECS Task started: %s', response) self.arn = response['tasks'][0]['taskArn'] self._wait_for_task_ended() self._check_success_task() - logging.info('ECS Task has been successfully executed: {}'.format(response)) + self.logger.info('ECS Task has been successfully executed: %s', response) def _wait_for_task_ended(self): waiter = self.client.get_waiter('tasks_stopped') @@ -100,9 +98,9 @@ class ECSOperator(BaseOperator): cluster=self.cluster, tasks=[self.arn] ) - logging.info('ECS Task stopped, check status: {}'.format(response)) + self.logger.info('ECS Task stopped, check status: %s', response) - if (len(response.get('failures', [])) > 0): + if len(response.get('failures', [])) > 0: raise AirflowException(response) for task in response['tasks']: @@ -126,4 +124,4 @@ class ECSOperator(BaseOperator): cluster=self.cluster, task=self.arn, reason='Task killed by the user') - logging.info(response) + self.logger.info(response) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_add_steps_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py index 84ef2d0..dbf764e 100644 --- a/airflow/contrib/operators/emr_add_steps_operator.py +++ b/airflow/contrib/operators/emr_add_steps_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.models import BaseOperator from airflow.utils import apply_defaults from airflow.exceptions import AirflowException @@ -51,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator): def execute(self, context): emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn() - logging.info('Adding steps to %s', self.job_flow_id) + self.logger.info('Adding steps to %s', self.job_flow_id) response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: raise AirflowException('Adding steps failed: %s' % response) else: - logging.info('Steps %s added to JobFlow', response['StepIds']) + self.logger.info('Steps %s added to JobFlow', response['StepIds']) return response['StepIds'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_create_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py index 37d885d..4e40b17 100644 --- a/airflow/contrib/operators/emr_create_job_flow_operator.py +++ b/airflow/contrib/operators/emr_create_job_flow_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.contrib.hooks.emr_hook import EmrHook from airflow.models import BaseOperator from airflow.utils import apply_defaults @@ -53,11 +50,14 @@ class EmrCreateJobFlowOperator(BaseOperator): def execute(self, context): emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id) - logging.info('Creating JobFlow') + self.logger.info( + 'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s', + self.aws_conn_id, self.emr_conn_id + ) response = emr.create_job_flow(self.job_flow_overrides) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: raise AirflowException('JobFlow creation failed: %s' % response) else: - logging.info('JobFlow with id %s created', response['JobFlowId']) + self.logger.info('JobFlow with id %s created', response['JobFlowId']) return response['JobFlowId'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_terminate_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py index 1b57276..df641ad 100644 --- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py +++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.models import BaseOperator from airflow.utils import apply_defaults from airflow.exceptions import AirflowException @@ -46,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator): def execute(self, context): emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn() - logging.info('Terminating JobFlow %s', self.job_flow_id) + self.logger.info('Terminating JobFlow %s', self.job_flow_id) response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id]) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: raise AirflowException('JobFlow termination failed: %s' % response) else: - logging.info('JobFlow with id %s terminated', self.job_flow_id) + self.logger.info('JobFlow with id %s terminated', self.job_flow_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/file_to_wasb.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py index 32e6b29..4519e1e 100644 --- a/airflow/contrib/operators/file_to_wasb.py +++ b/airflow/contrib/operators/file_to_wasb.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import logging - from airflow.contrib.hooks.wasb_hook import WasbHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -23,7 +20,7 @@ from airflow.utils.decorators import apply_defaults class FileToWasbOperator(BaseOperator): """ Uploads a file to Azure Blob Storage. - + :param file_path: Path to the file to load. :type file_path: str :param container_name: Name of the container. @@ -54,8 +51,7 @@ class FileToWasbOperator(BaseOperator): def execute(self, context): """Upload a file to Azure Blob Storage.""" hook = WasbHook(wasb_conn_id=self.wasb_conn_id) - logging.info( - 'Uploading {self.file_path} to wasb://{self.container_name} as ' - '{self.blob_name}'.format(**locals())) - hook.load_file(self.file_path, self.container_name, self.blob_name, - **self.load_options) + self.logger.info( + 'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals()) + ) + hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/fs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py index 2596487..ca7d716 100644 --- a/airflow/contrib/operators/fs_operator.py +++ b/airflow/contrib/operators/fs_operator.py @@ -14,7 +14,6 @@ # from os import walk -import logging from airflow.operators.sensors import BaseSensorOperator from airflow.contrib.hooks.fs_hook import FSHook @@ -49,8 +48,7 @@ class FileSensor(BaseSensorOperator): hook = FSHook(self.fs_conn_id) basepath = hook.get_path() full_path = "/".join([basepath, self.filepath]) - logging.info( - 'Poking for file {full_path} '.format(**locals())) + self.logger.info('Poking for file {full_path}'.format(**locals())) try: files = [f for f in walk(full_path)] except: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/gcs_download_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py index c17f774..27e85b7 100644 --- a/airflow/contrib/operators/gcs_download_operator.py +++ b/airflow/contrib/operators/gcs_download_operator.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import sys from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook @@ -48,7 +47,6 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): template_fields = ('bucket', 'object', 'filename', 'store_to_xcom_key',) ui_color = '#f0eee4' - @apply_defaults def __init__(self, bucket, object, @@ -67,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): self.delegate_to = delegate_to def execute(self, context): - logging.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename) + self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename) hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) file_bytes = hook.download(self.bucket, self.object, self.filename) @@ -76,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes) else: raise RuntimeError('The size of the downloaded file is too large to push to XCom!') - logging.info(file_bytes) + self.logger.info(file_bytes) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 39f0a48..01f53cc 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -13,7 +13,6 @@ # limitations under the License. import json -import logging from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.contrib.hooks.bigquery_hook import BigQueryHook @@ -190,7 +189,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.destination_project_dataset_table)) row = cursor.fetchone() max_id = row[0] if row[0] else 0 - logging.info('Loaded BQ data with max {}.{}={}'.format( - self.destination_project_dataset_table, - self.max_id_key, max_id)) + self.logger.info( + 'Loaded BQ data with max %s.%s=%s', + self.destination_project_dataset_table, self.max_id_key, max_id + ) return max_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/hipchat_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py index aeb37d9..19c6d76 100644 --- a/airflow/contrib/operators/hipchat_operator.py +++ b/airflow/contrib/operators/hipchat_operator.py @@ -17,7 +17,6 @@ from builtins import str from airflow.utils.decorators import apply_defaults from airflow.models import BaseOperator from airflow.exceptions import AirflowException -import logging import requests import json @@ -67,7 +66,7 @@ class HipChatAPIOperator(BaseOperator): 'Authorization': 'Bearer %s' % self.token}, data=self.body) if response.status_code >= 400: - logging.error('HipChat API call failed: %s %s', + self.logger.error('HipChat API call failed: %s %s', response.status_code, response.reason) raise AirflowException('HipChat API call failed: %s %s' % (response.status_code, response.reason)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mlengine_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py index 7476825..fdbfede 100644 --- a/airflow/contrib/operators/mlengine_operator.py +++ b/airflow/contrib/operators/mlengine_operator.py @@ -13,8 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging import re from airflow import settings @@ -24,8 +22,9 @@ from airflow.operators import BaseOperator from airflow.utils.decorators import apply_defaults from apiclient import errors +from airflow.utils.log.LoggingMixin import LoggingMixin -logging.getLogger('GoogleCloudMLEngine').setLevel(settings.LOGGING_LEVEL) +log = LoggingMixin().logger def _create_prediction_input(project_id, @@ -52,7 +51,6 @@ def _create_prediction_input(project_id, Raises: ValueError: if a unique model/version origin cannot be determined. """ - prediction_input = { 'dataFormat': data_format, 'inputPaths': input_paths, @@ -62,9 +60,9 @@ def _create_prediction_input(project_id, if uri: if model_name or version_name: - logging.error( - 'Ambiguous model origin: Both uri and model/version name are ' - 'provided.') + log.error( + 'Ambiguous model origin: Both uri and model/version name are provided.' + ) raise ValueError('Ambiguous model origin.') prediction_input['uri'] = uri elif model_name: @@ -75,7 +73,7 @@ def _create_prediction_input(project_id, prediction_input['versionName'] = \ origin_name + '/versions/{}'.format(version_name) else: - logging.error( + log.error( 'Missing model origin: Batch prediction expects a model, ' 'a model & version combination, or a URI to savedModel.') raise ValueError('Missing model origin.') @@ -227,9 +225,10 @@ class MLEngineBatchPredictionOperator(BaseOperator): model_name, version_name, uri, max_worker_count, runtime_version) except ValueError as e: - logging.error( - 'Cannot create batch prediction job request due to: {}' - .format(str(e))) + self.logger.error( + 'Cannot create batch prediction job request due to: %s', + e + ) raise self.prediction_job_request = { @@ -252,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator): raise if finished_prediction_job['state'] != 'SUCCEEDED': - logging.error( + self.logger.error( 'Batch prediction job failed: %s', str(finished_prediction_job)) raise RuntimeError(finished_prediction_job['errorMessage']) @@ -539,9 +538,8 @@ class MLEngineTrainingOperator(BaseOperator): } if self._mode == 'DRY_RUN': - logging.info('In dry_run mode.') - logging.info( - 'MLEngine Training job request is: {}'.format(training_request)) + self.logger.info('In dry_run mode.') + self.logger.info('MLEngine Training job request is: {}'.format(training_request)) return hook = MLEngineHook( @@ -559,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator): raise if finished_training_job['state'] != 'SUCCEEDED': - logging.error('MLEngine training job failed: {}'.format( + self.logger.error('MLEngine training job failed: {}'.format( str(finished_training_job))) raise RuntimeError(finished_training_job['errorMessage']) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mlengine_prediction_summary.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mlengine_prediction_summary.py b/airflow/contrib/operators/mlengine_prediction_summary.py index 1f4d540..17fc2c0 100644 --- a/airflow/contrib/operators/mlengine_prediction_summary.py +++ b/airflow/contrib/operators/mlengine_prediction_summary.py @@ -95,7 +95,6 @@ from __future__ import print_function import argparse import base64 import json -import logging import os import apache_beam as beam @@ -173,5 +172,4 @@ def run(argv=None): if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) run() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mysql_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index 374567b..7e83bce 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -13,14 +13,12 @@ # limitations under the License. import json -import logging import time from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults -from collections import OrderedDict from datetime import date, datetime from decimal import Decimal from MySQLdb.constants import FIELD_TYPE @@ -170,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): 'mode': field_mode, }) - logging.info('Using schema for %s: %s', self.schema_filename, schema) + self.logger.info('Using schema for %s: %s', self.schema_filename, schema) tmp_schema_file_handle = NamedTemporaryFile(delete=True) json.dump(schema, tmp_schema_file_handle) return {self.schema_filename: tmp_schema_file_handle} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/sftp_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py index b9f07d5..5abfc51 100644 --- a/airflow/contrib/operators/sftp_operator.py +++ b/airflow/contrib/operators/sftp_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.contrib.hooks.ssh_hook import SSHHook from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -84,12 +81,12 @@ class SFTPOperator(BaseOperator): if self.operation.lower() == SFTPOperation.GET: file_msg = "from {0} to {1}".format(self.remote_filepath, self.local_filepath) - logging.debug("Starting to transfer {0}".format(file_msg)) + self.logger.debug("Starting to transfer %s", file_msg) sftp_client.get(self.remote_filepath, self.local_filepath) else: file_msg = "from {0} to {1}".format(self.local_filepath, self.remote_filepath) - logging.debug("Starting to transfer file {0}".format(file_msg)) + self.logger.debug("Starting to transfer file %s", file_msg) sftp_client.put(self.local_filepath, self.remote_filepath) except Exception as e: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/spark_submit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py index ca628e9..2aed4c6 100644 --- a/airflow/contrib/operators/spark_submit_operator.py +++ b/airflow/contrib/operators/spark_submit_operator.py @@ -12,14 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import logging - from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults from airflow.settings import WEB_COLORS - -log = logging.getLogger(__name__) +from airflow.utils.decorators import apply_defaults class SparkSubmitOperator(BaseOperator): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/ssh_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py index 2e03b96..897cd1a 100644 --- a/airflow/contrib/operators/ssh_operator.py +++ b/airflow/contrib/operators/ssh_operator.py @@ -13,7 +13,6 @@ # limitations under the License. from base64 import b64encode -import logging from airflow import configuration from airflow.contrib.hooks.ssh_hook import SSHHook http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/vertica_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py index 9266563..fc9cf3b 100644 --- a/airflow/contrib/operators/vertica_operator.py +++ b/airflow/contrib/operators/vertica_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.contrib.hooks.vertica_hook import VerticaHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -42,6 +39,6 @@ class VerticaOperator(BaseOperator): self.sql = sql def execute(self, context): - logging.info('Executing: ' + str(self.sql)) + self.logger.info('Executing: %s', self.sql) hook = VerticaHook(vertica_conn_id=self.vertica_conn_id) hook.run(self.sql) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/vertica_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py index 31ce110..35ff3c6 100644 --- a/airflow/contrib/operators/vertica_to_hive.py +++ b/airflow/contrib/operators/vertica_to_hive.py @@ -15,7 +15,6 @@ from builtins import chr from collections import OrderedDict import unicodecsv as csv -import logging from tempfile import NamedTemporaryFile from airflow.hooks.hive_hooks import HiveCliHook @@ -104,7 +103,7 @@ class VerticaToHiveTransfer(BaseOperator): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id) - logging.info("Dumping Vertica query results to local file") + self.logger.info("Dumping Vertica query results to local file") conn = vertica.get_conn() cursor = conn.cursor() cursor.execute(self.sql) @@ -120,7 +119,7 @@ class VerticaToHiveTransfer(BaseOperator): f.flush() cursor.close() conn.close() - logging.info("Loading file into Hive") + self.logger.info("Loading file into Hive") hive.load_file( f.name, self.hive_table, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/bigquery_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py index 8a8ca62..630cebe 100644 --- a/airflow/contrib/sensors/bigquery_sensor.py +++ b/airflow/contrib/sensors/bigquery_sensor.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.operators.sensors import BaseSensorOperator from airflow.contrib.hooks.bigquery_hook import BigQueryHook from airflow.utils.decorators import apply_defaults @@ -62,7 +59,7 @@ class BigQueryTableSensor(BaseSensorOperator): def poke(self, context): table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id) - logging.info('Sensor checks existence of table: %s', table_uri) + self.logger.info('Sensor checks existence of table: %s', table_uri) hook = BigQueryHook( bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/datadog_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py index d8660f7..4ee45f9 100644 --- a/airflow/contrib/sensors/datadog_sensor.py +++ b/airflow/contrib/sensors/datadog_sensor.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.operators.sensors import BaseSensorOperator from airflow.contrib.hooks.datadog_hook import DatadogHook from airflow.utils import apply_defaults @@ -70,7 +67,7 @@ class DatadogSensor(BaseSensorOperator): tags=self.tags) if isinstance(response, dict) and response.get('status', 'ok') != 'ok': - logging.error("Unexpected datadog result: %s" % (response)) + self.logger.error("Unexpected Datadog result: %s", response) raise AirflowException("Datadog returned unexpected result") if self.response_check: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_base_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py index 5526604..034fcb6 100644 --- a/airflow/contrib/sensors/emr_base_sensor.py +++ b/airflow/contrib/sensors/emr_base_sensor.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.operators.sensors import BaseSensorOperator from airflow.utils import apply_defaults from airflow.exceptions import AirflowException @@ -23,7 +20,7 @@ class EmrBaseSensor(BaseSensorOperator): """ Contains general sensor behavior for EMR. Subclasses should implement get_emr_response() and state_from_response() methods. - Subclasses should also implment NON_TERMINAL_STATES and FAILED_STATE constants. + Subclasses should also implement NON_TERMINAL_STATES and FAILED_STATE constants. """ ui_color = '#66c3ff' @@ -39,11 +36,11 @@ class EmrBaseSensor(BaseSensorOperator): response = self.get_emr_response() if not response['ResponseMetadata']['HTTPStatusCode'] == 200: - logging.info('Bad HTTP response: %s' % response) + self.logger.info('Bad HTTP response: %s', response) return False state = self.state_from_response(response) - logging.info('Job flow currently %s' % state) + self.logger.info('Job flow currently %s', state) if state in self.NON_TERMINAL_STATES: return False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_job_flow_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py index 662b3b8..e5610a1 100644 --- a/airflow/contrib/sensors/emr_job_flow_sensor.py +++ b/airflow/contrib/sensors/emr_job_flow_sensor.py @@ -11,10 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - -import logging - from airflow.contrib.hooks.emr_hook import EmrHook from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor from airflow.utils import apply_defaults @@ -45,7 +41,7 @@ class EmrJobFlowSensor(EmrBaseSensor): def get_emr_response(self): emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn() - logging.info('Poking cluster %s' % self.job_flow_id) + self.logger.info('Poking cluster %s', self.job_flow_id) return emr.describe_cluster(ClusterId=self.job_flow_id) def state_from_response(self, response): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_step_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py index 4cc6bc4..e131d77 100644 --- a/airflow/contrib/sensors/emr_step_sensor.py +++ b/airflow/contrib/sensors/emr_step_sensor.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.contrib.hooks.emr_hook import EmrHook from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor from airflow.utils import apply_defaults @@ -48,7 +45,7 @@ class EmrStepSensor(EmrBaseSensor): def get_emr_response(self): emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn() - logging.info('Poking step {0} on cluster {1}'.format(self.step_id, self.job_flow_id)) + self.logger.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id) return emr.describe_step(ClusterId=self.job_flow_id, StepId=self.step_id) def state_from_response(self, response): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/ftp_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py index 4a9428b..2e604e9 100644 --- a/airflow/contrib/sensors/ftp_sensor.py +++ b/airflow/contrib/sensors/ftp_sensor.py @@ -11,9 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import ftplib -import logging from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook from airflow.operators.sensors import BaseSensorOperator @@ -44,7 +42,7 @@ class FTPSensor(BaseSensorOperator): def poke(self, context): with self._create_hook() as hook: - logging.info('Poking for %s', self.path) + self.logger.info('Poking for %s', self.path) try: hook.get_mod_time(self.path) except ftplib.error_perm as e: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/gcs_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py index c9d741b..800c1bd 100644 --- a/airflow/contrib/sensors/gcs_sensor.py +++ b/airflow/contrib/sensors/gcs_sensor.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.operators.sensors import BaseSensorOperator from airflow.utils.decorators import apply_defaults @@ -57,7 +54,7 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator): self.delegate_to = delegate_to def poke(self, context): - logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object) + self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object) hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to) @@ -119,7 +116,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator): self.delegate_to = delegate_to def poke(self, context): - logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object) + self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object) hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/hdfs_sensors.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py index 4e9bb9b..11e8b07 100644 --- a/airflow/contrib/sensors/hdfs_sensors.py +++ b/airflow/contrib/sensors/hdfs_sensors.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. from airflow.operators.sensors import HdfsSensor -import logging class HdfsSensorRegex(HdfsSensor): @@ -29,9 +28,9 @@ class HdfsSensorRegex(HdfsSensor): :return: Bool depending on the search criteria """ sb = self.hook(self.hdfs_conn_id).get_conn() - logging.getLogger("snakebite").setLevel(logging.WARNING) - logging.info( - 'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())) + self.logger.info( + 'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals()) + ) result = [f for f in sb.ls([self.filepath], include_toplevel=False) if f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))] result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) @@ -53,15 +52,14 @@ class HdfsSensorFolder(HdfsSensor): :return: Bool depending on the search criteria """ sb = self.hook(self.hdfs_conn_id).get_conn() - logging.getLogger("snakebite").setLevel(logging.WARNING) result = [f for f in sb.ls([self.filepath], include_toplevel=True)] result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) result = self.filter_for_filesize(result, self.file_size) if self.be_empty: - logging.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals())) + self.logger.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals())) return len(result) == 1 and result[0]['path'] == self.filepath else: - logging.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals())) + self.logger.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals())) result.pop(0) return bool(result) and result[0]['file_type'] == 'f' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/jira_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py index 708caad..4cbc676 100644 --- a/airflow/contrib/sensors/jira_sensor.py +++ b/airflow/contrib/sensors/jira_sensor.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from jira.resources import Resource from airflow.contrib.operators.jira_operator import JIRAError @@ -100,8 +97,7 @@ class JiraTicketSensor(JiraSensor): *args, **kwargs) def poke(self, context): - logging.info('Jira Sensor checking for change in ticket : {0}' - .format(self.ticket_id)) + self.logger.info('Jira Sensor checking for change in ticket: %s', self.ticket_id) self.jira_operator.method_name = "issue" self.jira_operator.jira_method_args = { @@ -127,20 +123,19 @@ class JiraTicketSensor(JiraSensor): and getattr(field_value, 'name'): result = self.expected_value.lower() == field_value.name.lower() else: - logging.warning("not implemented checker for issue field {0} " - "which is neither string nor list nor " - "jira Resource".format(self.field)) + self.logger.warning( + "Not implemented checker for issue field %s which " + "is neither string nor list nor Jira Resource", + self.field + ) except JIRAError as jira_error: - logging.error("jira error while checking with expected value: {0}" - .format(jira_error)) + self.logger.error("Jira error while checking with expected value: %s", jira_error) except Exception as e: - logging.error("error while checking with expected value {0}, error: {1}" - .format(self.expected_value, e)) + self.logger.error("Error while checking with expected value %s:", self.expected_value) + self.logger.exception(e) if result is True: - logging.info("issue field {0} has expected value {1}, returning success" - .format(self.field, self.expected_value)) + self.logger.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value) else: - logging.info("issue field {0} dont have expected value {1} yet." - .format(self.field, self.expected_value)) + self.logger.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value) return result http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/redis_key_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py index 4cab407..220d766 100644 --- a/airflow/contrib/sensors/redis_key_sensor.py +++ b/airflow/contrib/sensors/redis_key_sensor.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.contrib.hooks.redis_hook import RedisHook from airflow.operators.sensors import BaseSensorOperator from airflow.utils.decorators import apply_defaults @@ -37,7 +34,6 @@ class RedisKeySensor(BaseSensorOperator): :type redis_conn_id: string """ super(RedisKeySensor, self).__init__(*args, **kwargs) - self.logger = logging.getLogger(__name__) self.redis_conn_id = redis_conn_id self.key = key http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/wasb_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py index 3f3d56c..1a54e12 100644 --- a/airflow/contrib/sensors/wasb_sensor.py +++ b/airflow/contrib/sensors/wasb_sensor.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import logging - from airflow.contrib.hooks.wasb_hook import WasbHook from airflow.operators.sensors import BaseSensorOperator from airflow.utils.decorators import apply_defaults @@ -23,7 +20,7 @@ from airflow.utils.decorators import apply_defaults class WasbBlobSensor(BaseSensorOperator): """ Waits for a blob to arrive on Azure Blob Storage. - + :param container_name: Name of the container. :type container_name: str :param blob_name: Name of the blob. @@ -50,7 +47,7 @@ class WasbBlobSensor(BaseSensorOperator): self.check_options = check_options def poke(self, context): - logging.info( + self.logger.info( 'Poking for blob: {self.blob_name}\n' 'in wasb://{self.container_name}'.format(**locals()) ) @@ -62,7 +59,7 @@ class WasbBlobSensor(BaseSensorOperator): class WasbPrefixSensor(BaseSensorOperator): """ Waits for blobs matching a prefix to arrive on Azure Blob Storage. - + :param container_name: Name of the container. :type container_name: str :param prefix: Prefix of the blob. @@ -88,7 +85,7 @@ class WasbPrefixSensor(BaseSensorOperator): self.check_options = check_options def poke(self, context): - logging.info( + self.logger.info( 'Poking for prefix: {self.prefix}\n' 'in wasb://{self.container_name}'.format(**locals()) ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/task_runner/cgroup_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py b/airflow/contrib/task_runner/cgroup_task_runner.py index 11c45c1..5d2518d 100644 --- a/airflow/contrib/task_runner/cgroup_task_runner.py +++ b/airflow/contrib/task_runner/cgroup_task_runner.py @@ -14,7 +14,6 @@ import datetime import getpass -import subprocess import os import uuid @@ -73,13 +72,13 @@ class CgroupTaskRunner(BaseTaskRunner): for path_element in path_split: name_to_node = {x.name: x for x in node.children} if path_element not in name_to_node: - self.logger.debug("Creating cgroup {} in {}" - .format(path_element, node.path)) + self.logger.debug("Creating cgroup %s in %s", path_element, node.path) node = node.create_cgroup(path_element) else: - self.logger.debug("Not creating cgroup {} in {} " - "since it already exists" - .format(path_element, node.path)) + self.logger.debug( + "Not creating cgroup %s in %s since it already exists", + path_element, node.path + ) node = name_to_node[path_element] return node @@ -95,24 +94,23 @@ class CgroupTaskRunner(BaseTaskRunner): for path_element in path_split: name_to_node = {x.name: x for x in node.children} if path_element not in name_to_node: - self.logger.warning("Cgroup does not exist: {}" - .format(path)) + self.logger.warning("Cgroup does not exist: %s", path) return else: node = name_to_node[path_element] # node is now the leaf node parent = node.parent - self.logger.debug("Deleting cgroup {}/{}".format(parent, node.name)) + self.logger.debug("Deleting cgroup %s/%s", parent, node.name) parent.delete_cgroup(node.name) def start(self): # Use bash if it's already in a cgroup cgroups = self._get_cgroup_names() if cgroups["cpu"] != "/" or cgroups["memory"] != "/": - self.logger.debug("Already running in a cgroup (cpu: {} memory: {} so " - "not creating another one" - .format(cgroups.get("cpu"), - cgroups.get("memory"))) + self.logger.debug( + "Already running in a cgroup (cpu: %s memory: %s) so not creating another one", + cgroups.get("cpu"), cgroups.get("memory") + ) self.process = self.run_command(['bash', '-c'], join_args=True) return @@ -135,21 +133,27 @@ class CgroupTaskRunner(BaseTaskRunner): mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name) self._created_mem_cgroup = True if self._mem_mb_limit > 0: - self.logger.debug("Setting {} with {} MB of memory" - .format(self.mem_cgroup_name, self._mem_mb_limit)) + self.logger.debug( + "Setting %s with %s MB of memory", + self.mem_cgroup_name, self._mem_mb_limit + ) mem_cgroup_node.controller.limit_in_bytes = self._mem_mb_limit * 1024 * 1024 # Create the CPU cgroup cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name) self._created_cpu_cgroup = True if self._cpu_shares > 0: - self.logger.debug("Setting {} with {} CPU shares" - .format(self.cpu_cgroup_name, self._cpu_shares)) + self.logger.debug( + "Setting %s with %s CPU shares", + self.cpu_cgroup_name, self._cpu_shares + ) cpu_cgroup_node.controller.shares = self._cpu_shares # Start the process w/ cgroups - self.logger.debug("Starting task process with cgroups cpu,memory:{}" - .format(cgroup_name)) + self.logger.debug( + "Starting task process with cgroups cpu,memory: %s", + cgroup_name + ) self.process = self.run_command( ['cgexec', '-g', 'cpu,memory:{}'.format(cgroup_name)] ) @@ -165,10 +169,9 @@ class CgroupTaskRunner(BaseTaskRunner): # we might want to revisit that approach at some other point. if return_code == 137: self.logger.warning("Task failed with return code of 137. This may indicate " - "that it was killed due to excessive memory usage. " - "Please consider optimizing your task or using the " - "resources argument to reserve more memory for your " - "task") + "that it was killed due to excessive memory usage. " + "Please consider optimizing your task or using the " + "resources argument to reserve more memory for your task") return return_code def terminate(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index a8cb087..7812f96 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging import sys from airflow import configuration @@ -21,6 +19,7 @@ from airflow.executors.local_executor import LocalExecutor from airflow.executors.sequential_executor import SequentialExecutor from airflow.exceptions import AirflowException +from airflow.utils.log.LoggingMixin import LoggingMixin DEFAULT_EXECUTOR = None @@ -42,14 +41,15 @@ def GetDefaultExecutor(): DEFAULT_EXECUTOR = _get_executor(executor_name) - logging.info("Using executor " + executor_name) + log = LoggingMixin().logger + log.info("Using executor %s", executor_name) return DEFAULT_EXECUTOR def _get_executor(executor_name): """ - Creates a new instance of the named executor. In case the executor name is not know in airflow, + Creates a new instance of the named executor. In case the executor name is not know in airflow, look for it in the plugins """ if executor_name == 'LocalExecutor': http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 7a4065e..1197958 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -11,12 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from builtins import range from airflow import configuration +from airflow.utils.log.LoggingMixin import LoggingMixin from airflow.utils.state import State -from airflow.utils.logging import LoggingMixin PARALLELISM = configuration.getint('core', 'PARALLELISM') @@ -47,7 +46,7 @@ class BaseExecutor(LoggingMixin): def queue_command(self, task_instance, command, priority=1, queue=None): key = task_instance.key if key not in self.queued_tasks and key not in self.running: - self.logger.info("Adding to queue: {}".format(command)) + self.logger.info("Adding to queue: %s", command) self.queued_tasks[key] = (command, priority, queue, task_instance) def queue_task_instance( @@ -100,9 +99,9 @@ class BaseExecutor(LoggingMixin): else: open_slots = self.parallelism - len(self.running) - self.logger.debug("{} running task instances".format(len(self.running))) - self.logger.debug("{} in queue".format(len(self.queued_tasks))) - self.logger.debug("{} open slots".format(open_slots)) + self.logger.debug("%s running task instances", len(self.running)) + self.logger.debug("%s in queue", len(self.queued_tasks)) + self.logger.debug("%s open slots", open_slots) sorted_queue = sorted( [(k, v) for k, v in self.queued_tasks.items()], @@ -124,11 +123,12 @@ class BaseExecutor(LoggingMixin): self.execute_async(key, command=command, queue=queue) else: self.logger.debug( - 'Task is already running, not sending to ' - 'executor: {}'.format(key)) + 'Task is already running, not sending to executor: %s', + key + ) # Calling child class sync method - self.logger.debug("Calling the {} sync method".format(self.__class__)) + self.logger.debug("Calling the %s sync method", self.__class__) self.sync() def change_state(self, key, state): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 17c343b..39c895c 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -13,7 +13,6 @@ # limitations under the License. from builtins import object -import logging import subprocess import ssl import time @@ -25,6 +24,7 @@ from celery import states as celery_states from airflow.exceptions import AirflowConfigException, AirflowException from airflow.executors.base_executor import BaseExecutor from airflow import configuration +from airflow.utils.log.LoggingMixin import LoggingMixin PARALLELISM = configuration.get('core', 'PARALLELISM') @@ -53,7 +53,8 @@ class CeleryConfig(object): try: celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE') except AirflowConfigException as e: - logging.warning("Celery Executor will run without SSL") + log = LoggingMixin().logger + log.warning("Celery Executor will run without SSL") try: if celery_ssl_active: @@ -75,11 +76,12 @@ app = Celery( @app.task def execute_command(command): - logging.info("Executing command in Celery " + command) + log = LoggingMixin().logger + log.info("Executing command in Celery: %s", command) try: subprocess.check_call(command, shell=True) except subprocess.CalledProcessError as e: - logging.error(e) + log.error(e) raise AirflowException('Celery command failed') @@ -92,22 +94,18 @@ class CeleryExecutor(BaseExecutor): vast amounts of messages, while providing operations with the tools required to maintain such a system. """ - def start(self): self.tasks = {} self.last_state = {} def execute_async(self, key, command, queue=DEFAULT_QUEUE): - self.logger.info( "[celery] queuing {key} through celery, " - "queue={queue}".format(**locals())) + self.logger.info("[celery] queuing {key} through celery, queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( args=[command], queue=queue) self.last_state[key] = celery_states.PENDING def sync(self): - - self.logger.debug( - "Inquiring about {} celery task(s)".format(len(self.tasks))) + self.logger.debug("Inquiring about %s celery task(s)", len(self.tasks)) for key, async in list(self.tasks.items()): try: state = async.state @@ -125,11 +123,11 @@ class CeleryExecutor(BaseExecutor): del self.tasks[key] del self.last_state[key] else: - self.logger.info("Unexpected state: " + async.state) + self.logger.info("Unexpected state: %s", async.state) self.last_state[key] = async.state except Exception as e: - logging.error("Error syncing the celery executor, ignoring " - "it:\n{}\n".format(e, traceback.format_exc())) + self.logger.error("Error syncing the celery executor, ignoring it:") + self.logger.exception(e) def end(self, synchronous=False): if synchronous: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index d65830a..8a56506 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -13,7 +13,6 @@ # limitations under the License. import distributed - import subprocess import warnings @@ -41,8 +40,8 @@ class DaskExecutor(BaseExecutor): def execute_async(self, key, command, queue=None): if queue is not None: warnings.warn( - 'DaskExecutor does not support queues. All tasks will be run ' - 'in the same cluster') + 'DaskExecutor does not support queues. All tasks will be run in the same cluster' + ) def airflow_run(): return subprocess.check_call(command, shell=True) @@ -54,12 +53,11 @@ class DaskExecutor(BaseExecutor): if future.done(): key = self.futures[future] if future.exception(): + self.logger.error("Failed to execute task: %s", repr(future.exception())) self.fail(key) - self.logger.error("Failed to execute task: {}".format( - repr(future.exception()))) elif future.cancelled(): - self.fail(key) self.logger.error("Failed to execute task") + self.fail(key) else: self.success(key) self.futures.pop(future)
