http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index a58c3d7..9730737 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -20,14 +20,13 @@ from builtins import range
 
 from airflow import configuration
 from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.log.LoggingMixin import LoggingMixin
 from airflow.utils.state import State
-from airflow.utils.logging import LoggingMixin
 
 PARALLELISM = configuration.get('core', 'PARALLELISM')
 
 
 class LocalWorker(multiprocessing.Process, LoggingMixin):
-
     def __init__(self, task_queue, result_queue):
         multiprocessing.Process.__init__(self)
         self.task_queue = task_queue
@@ -41,15 +40,15 @@ class LocalWorker(multiprocessing.Process, LoggingMixin):
                 # Received poison pill, no more tasks to run
                 self.task_queue.task_done()
                 break
-            self.logger.info("{} running {}".format(
-                self.__class__.__name__, command))
+            self.logger.info("%s running %s", self.__class__.__name__, command)
             command = "exec bash -c '{0}'".format(command)
             try:
                 subprocess.check_call(command, shell=True)
                 state = State.SUCCESS
             except subprocess.CalledProcessError as e:
                 state = State.FAILED
-                self.logger.error("failed to execute task {}:".format(str(e)))
+                self.logger.error("Failed to execute task %s.", str(e))
+                # TODO: Why is this commented out?
                 # raise e
             self.result_queue.put((key, state))
             self.task_queue.task_done()
@@ -68,7 +67,7 @@ class LocalExecutor(BaseExecutor):
         self.result_queue = multiprocessing.Queue()
         self.workers = [
             LocalWorker(self.queue, self.result_queue)
-            for i in range(self.parallelism)
+            for _ in range(self.parallelism)
         ]
 
         for w in self.workers:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/sequential_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index 43180cc..7d08a4b 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -37,14 +37,14 @@ class SequentialExecutor(BaseExecutor):
 
     def sync(self):
         for key, command in self.commands_to_run:
-            self.logger.info("Executing command: {}".format(command))
+            self.logger.info("Executing command: %s", command)
 
             try:
                 subprocess.check_call(command, shell=True)
                 self.change_state(key, State.SUCCESS)
             except subprocess.CalledProcessError as e:
                 self.change_state(key, State.FAILED)
-                self.logger.error("Failed to execute task {}:".format(str(e)))
+                self.logger.error("Failed to execute task %s.", str(e))
 
         self.commands_to_run = []
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index caaa575..2f7e6ee 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -13,9 +13,12 @@
 # limitations under the License.
 
 from __future__ import division
+
 from future import standard_library
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 standard_library.install_aliases()
-import logging
 import re
 import fnmatch
 import configparser
@@ -27,8 +30,8 @@ import warnings
 import boto
 from boto.s3.connection import S3Connection, NoHostProvided
 from boto.sts import STSConnection
+
 boto.set_stream_logger('boto')
-logging.getLogger("boto").setLevel(logging.INFO)
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
@@ -84,7 +87,8 @@ def _parse_s3_config(config_file_name, config_format='boto', 
profile=None):
             if Config.has_option(cred_section, 'calling_format'):
                 calling_format = Config.get(cred_section, 'calling_format')
         except:
-            logging.warning("Option Error in parsing s3 config file")
+            log = LoggingMixin().logger
+            log.warning("Option Error in parsing s3 config file")
             raise
         return (access_key, secret_key, calling_format)
 
@@ -94,8 +98,8 @@ class S3Hook(BaseHook):
     Interact with S3. This class is a wrapper around the boto library.
     """
     def __init__(
-            self,
-            s3_conn_id='s3_default'):
+        self,
+        s3_conn_id='s3_default'):
         self.s3_conn_id = s3_conn_id
         self.s3_conn = self.get_connection(s3_conn_id)
         self.extra_params = self.s3_conn.extra_dejson
@@ -164,8 +168,8 @@ class S3Hook(BaseHook):
         a_key = s_key = None
         if self._creds_in_config_file:
             a_key, s_key, calling_format = 
_parse_s3_config(self.s3_config_file,
-                                                self.s3_config_format,
-                                                self.profile)
+                                                            
self.s3_config_format,
+                                                            self.profile)
         elif self._creds_in_conn:
             a_key = self._a_key
             s_key = self._s_key
@@ -185,14 +189,14 @@ class S3Hook(BaseHook):
             assumed_role_object = sts_connection.assume_role(
                 role_arn=self.role_arn,
                 role_session_name="Airflow_" + self.s3_conn_id
-                )
+            )
             creds = assumed_role_object.credentials
             connection = S3Connection(
                 aws_access_key_id=creds.access_key,
                 aws_secret_access_key=creds.secret_key,
                 calling_format=calling_format,
                 security_token=creds.session_token
-                )
+            )
         else:
             connection = S3Connection(aws_access_key_id=a_key,
                                       aws_secret_access_key=s_key,
@@ -323,13 +327,13 @@ class S3Hook(BaseHook):
         return False if plist is None else prefix in plist
 
     def load_file(
-            self,
-            filename,
-            key,
-            bucket_name=None,
-            replace=False,
-            multipart_bytes=5 * (1024 ** 3),
-            encrypt=False):
+        self,
+        filename,
+        key,
+        bucket_name=None,
+        replace=False,
+        multipart_bytes=5 * (1024 ** 3),
+        encrypt=False):
         """
         Loads a local file to S3
 
@@ -373,10 +377,8 @@ class S3Hook(BaseHook):
                 for chunk in range(total_chunks):
                     offset = chunk * multipart_bytes
                     bytes = min(multipart_bytes, key_size - offset)
-                    with FileChunkIO(
-                            filename, 'r', offset=offset, bytes=bytes) as fp:
-                        logging.info('Sending chunk {c} of {tc}...'.format(
-                            c=chunk + 1, tc=total_chunks))
+                    with FileChunkIO(filename, 'r', offset=offset, 
bytes=bytes) as fp:
+                        self.logger.info('Sending chunk %s of %s...', chunk + 
1, total_chunks)
                         mp.upload_part_from_file(fp, part_num=chunk + 1)
             except:
                 mp.cancel_upload()
@@ -389,8 +391,9 @@ class S3Hook(BaseHook):
             key_size = key_obj.set_contents_from_filename(filename,
                                                           replace=replace,
                                                           encrypt_key=encrypt)
-        logging.info("The key {key} now contains"
-                     " {key_size} bytes".format(**locals()))
+        self.logger.info(
+            "The key {key} now contains {key_size} bytes".format(**locals())
+        )
 
     def load_string(self, string_data,
                     key, bucket_name=None,
@@ -429,5 +432,6 @@ class S3Hook(BaseHook):
         key_size = key_obj.set_contents_from_string(string_data,
                                                     replace=replace,
                                                     encrypt_key=encrypt)
-        logging.info("The key {key} now contains"
-                     " {key_size} bytes".format(**locals()))
+        self.logger.info(
+            "The key {key} now contains {key_size} bytes".format(**locals())
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index cef8c97..4617b98 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -17,19 +17,18 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
-from builtins import object
-import logging
 import os
 import random
 
 from airflow import settings
 from airflow.models import Connection
 from airflow.exceptions import AirflowException
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
 
 
-class BaseHook(object):
+class BaseHook(LoggingMixin):
     """
     Abstract base class for hooks, hooks are meant as an interface to
     interact with external systems. MySqlHook, HiveHook, PigHook return
@@ -40,6 +39,7 @@ class BaseHook(object):
     def __init__(self, source):
         pass
 
+
     @classmethod
     def _get_connections_from_db(cls, conn_id):
         session = settings.Session()
@@ -76,7 +76,8 @@ class BaseHook(object):
     def get_connection(cls, conn_id):
         conn = random.choice(cls.get_connections(conn_id))
         if conn.host:
-            logging.info("Using connection to: " + conn.host)
+            log = LoggingMixin().logger
+            log.info("Using connection to: %s", conn.host)
         return conn
 
     @classmethod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/dbapi_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index fe85153..85eebd0 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -16,8 +16,6 @@ from builtins import str
 from past.builtins import basestring
 from datetime import datetime
 from contextlib import closing
-import numpy
-import logging
 import sys
 
 from sqlalchemy import create_engine
@@ -88,8 +86,8 @@ class DbApiHook(BaseHook):
         if sys.version_info[0] < 3:
             sql = sql.encode('utf-8')
         import pandas.io.sql as psql
-        
-        with closing(self.get_conn()) as conn:        
+
+        with closing(self.get_conn()) as conn:
             return psql.read_sql(sql, con=conn, params=parameters)
 
     def get_records(self, sql, parameters=None):
@@ -104,7 +102,7 @@ class DbApiHook(BaseHook):
         """
         if sys.version_info[0] < 3:
             sql = sql.encode('utf-8')
-            
+
         with closing(self.get_conn()) as conn:
             with closing(conn.cursor()) as cur:
                 if parameters is not None:
@@ -125,7 +123,7 @@ class DbApiHook(BaseHook):
         """
         if sys.version_info[0] < 3:
             sql = sql.encode('utf-8')
-        
+
         with closing(self.get_conn()) as conn:
             with closing(conn.cursor()) as cur:
                 if parameters is not None:
@@ -151,21 +149,21 @@ class DbApiHook(BaseHook):
         """
         if isinstance(sql, basestring):
             sql = [sql]
-        
+
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, autocommit)
-            
+
             with closing(conn.cursor()) as cur:
                 for s in sql:
                     if sys.version_info[0] < 3:
                         s = s.encode('utf-8')
-                    logging.info(s)
+                    self.logger.info(s)
                     if parameters is not None:
                         cur.execute(s, parameters)
                     else:
                         cur.execute(s)
-            
+
             conn.commit()
 
     def set_autocommit(self, conn, autocommit):
@@ -197,13 +195,13 @@ class DbApiHook(BaseHook):
             target_fields = "({})".format(target_fields)
         else:
             target_fields = ''
-            
+
         with closing(self.get_conn()) as conn:
             if self.supports_autocommit:
                 self.set_autocommit(conn, False)
-            
+
             conn.commit()
-            
+
             with closing(conn.cursor()) as cur:
                 for i, row in enumerate(rows, 1):
                     l = []
@@ -218,11 +216,12 @@ class DbApiHook(BaseHook):
                     cur.execute(sql, values)
                     if commit_every and i % commit_every == 0:
                         conn.commit()
-                        logging.info(
-                            "Loaded {i} into {table} rows so 
far".format(**locals()))
-            
+                        self.logger.info(
+                            "Loaded {i} into {table} rows so 
far".format(**locals())
+                        )
+
             conn.commit()
-        logging.info(
+        self.logger.info(
             "Done loading. Loaded a total of {i} rows".format(**locals()))
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 350c230..af3ae9b 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -14,7 +14,6 @@
 
 from __future__ import print_function
 
-import logging
 import requests
 import time
 
@@ -33,7 +32,6 @@ class DruidHook(BaseHook):
     :param max_ingestion_time: The maximum ingestion time before assuming the 
job failed
     :type max_ingestion_time: int
     """
-
     def __init__(
             self,
             druid_ingest_conn_id='druid_ingest_default',
@@ -70,12 +68,12 @@ class DruidHook(BaseHook):
         while running:
             req_status = requests.get("{0}/{1}/status".format(url, 
druid_task_id))
 
-            logging.info("Job still running for {0} seconds...".format(sec))
+            self.logger.info("Job still running for %s seconds...", sec)
 
             sec = sec + 1
 
             if sec > self.max_ingestion_time:
-                raise AirflowException('Druid ingestion took more than {} 
seconds'.format(self.max_ingestion_time))
+                raise AirflowException('Druid ingestion took more than %s 
seconds', self.max_ingestion_time)
 
             time.sleep(self.timeout)
 
@@ -87,6 +85,6 @@ class DruidHook(BaseHook):
             elif status == 'FAILED':
                 raise AirflowException('Druid indexing job failed, check 
console for more info')
             else:
-                raise AirflowException('Could not get status of the job, got 
{0}'.format(status))
+                raise AirflowException('Could not get status of the job, got 
%s', status)
 
-        logging.info('Successful index')
+        self.logger.info('Successful index')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index d120769..70d7642 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -17,10 +17,8 @@ from __future__ import print_function
 from builtins import zip
 from past.builtins import basestring
 
-import collections
 import unicodecsv as csv
 import itertools
-import logging
 import re
 import subprocess
 import time
@@ -38,7 +36,6 @@ HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 
'LOW', 'VERY_LOW']
 
 
 class HiveCliHook(BaseHook):
-
     """Simple wrapper around the hive CLI.
 
     It also supports the ``beeline``
@@ -204,7 +201,7 @@ class HiveCliHook(BaseHook):
                 hive_cmd.extend(['-f', f.name])
 
                 if verbose:
-                    logging.info(" ".join(hive_cmd))
+                    self.logger.info(" ".join(hive_cmd))
                 sp = subprocess.Popen(
                     hive_cmd,
                     stdout=subprocess.PIPE,
@@ -218,7 +215,7 @@ class HiveCliHook(BaseHook):
                         break
                     stdout += line.decode('UTF-8')
                     if verbose:
-                        logging.info(line.decode('UTF-8').strip())
+                        self.logger.info(line.decode('UTF-8').strip())
                 sp.wait()
 
                 if sp.returncode:
@@ -249,7 +246,7 @@ class HiveCliHook(BaseHook):
             for query in query_set:
 
                 query_preview = ' '.join(query.split())[:50]
-                logging.info("Testing HQL [{0} (...)]".format(query_preview))
+                self.logger.info("Testing HQL [%s (...)]", query_preview)
                 if query_set == insert:
                     query = other + '; explain ' + query
                 else:
@@ -258,16 +255,16 @@ class HiveCliHook(BaseHook):
                     self.run_cli(query, verbose=False)
                 except AirflowException as e:
                     message = e.args[0].split('\n')[-2]
-                    logging.info(message)
+                    self.logger.info(message)
                     error_loc = re.search('(\d+):(\d+)', message)
                     if error_loc and error_loc.group(1).isdigit():
                         l = int(error_loc.group(1))
                         begin = max(l-2, 0)
                         end = min(l+3, len(query.split('\n')))
                         context = '\n'.join(query.split('\n')[begin:end])
-                        logging.info("Context :\n {0}".format(context))
+                        self.logger.info("Context :\n %s", context)
                 else:
-                    logging.info("SUCCESS")
+                    self.logger.info("SUCCESS")
 
     def load_df(
             self,
@@ -356,7 +353,7 @@ class HiveCliHook(BaseHook):
         final destination using a ``HiveOperator``.
 
         :param filepath: local filepath of the file to load
-        :type filepath: str       
+        :type filepath: str
         :param table: target Hive table, use dot notation to target a
             specific database
         :type table: str
@@ -398,9 +395,9 @@ class HiveCliHook(BaseHook):
                 tprops = ", ".join(
                     ["'{0}'='{1}'".format(k, v) for k, v in 
tblproperties.items()])
                 hql += "TBLPROPERTIES({tprops})\n"
-        hql += ";" 
+        hql += ";"
         hql = hql.format(**locals())
-        logging.info(hql)
+        self.logger.info(hql)
         self.run_cli(hql)
         hql = "LOAD DATA LOCAL INPATH '{filepath}' "
         if overwrite:
@@ -411,7 +408,7 @@ class HiveCliHook(BaseHook):
                 ["{0}='{1}'".format(k, v) for k, v in partition.items()])
             hql += "PARTITION ({pvals});"
         hql = hql.format(**locals())
-        logging.info(hql)
+        self.logger.info(hql)
         self.run_cli(hql)
 
     def kill(self):
@@ -665,8 +662,10 @@ class HiveServer2Hook(BaseHook):
 
         # impyla uses GSSAPI instead of KERBEROS as a auth_mechanism identifier
         if auth_mechanism == 'KERBEROS':
-            logging.warning("Detected deprecated 'KERBEROS' for authMechanism 
for %s. Please use 'GSSAPI' instead",
-                            self.hiveserver2_conn_id)
+            self.logger.warning(
+                "Detected deprecated 'KERBEROS' for authMechanism for %s. 
Please use 'GSSAPI' instead",
+                self.hiveserver2_conn_id
+            )
             auth_mechanism = 'GSSAPI'
 
         from impala.dbapi import connect
@@ -697,7 +696,7 @@ class HiveServer2Hook(BaseHook):
                     # may be `SET` or DDL
                     records = cur.fetchall()
                 except ProgrammingError:
-                    logging.debug("get_results returned no records")
+                    self.logger.debug("get_results returned no records")
                 if records:
                     results = {
                         'data': records,
@@ -717,7 +716,7 @@ class HiveServer2Hook(BaseHook):
         schema = schema or 'default'
         with self.get_conn(schema) as conn:
             with conn.cursor() as cur:
-                logging.info("Running query: " + hql)
+                self.logger.info("Running query: %s", hql)
                 cur.execute(hql)
                 schema = cur.description
                 with open(csv_filepath, 'wb') as f:
@@ -735,8 +734,8 @@ class HiveServer2Hook(BaseHook):
 
                         writer.writerows(rows)
                         i += len(rows)
-                        logging.info("Written {0} rows so far.".format(i))
-                    logging.info("Done. Loaded a total of {0} rows.".format(i))
+                        self.logger.info("Written %s rows so far.", i)
+                    self.logger.info("Done. Loaded a total of %s rows.", i)
 
     def get_records(self, hql, schema='default'):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/http_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py
index a144143..f168bc8 100644
--- a/airflow/hooks/http_hook.py
+++ b/airflow/hooks/http_hook.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 from builtins import str
-import logging
 
 import requests
 
@@ -83,7 +82,7 @@ class HttpHook(BaseHook):
                                    headers=headers)
 
         prepped_request = session.prepare_request(req)
-        logging.info("Sending '" + self.method + "' to url: " + url)
+        self.logger.info("Sending '%s' to url: %s", self.method, url)
         return self.run_and_check(session, prepped_request, extra_options)
 
     def run_and_check(self, session, prepped_request, extra_options):
@@ -108,12 +107,12 @@ class HttpHook(BaseHook):
             # Tried rewrapping, but not supported. This way, it's possible
             # to get reason and code for failure by checking first 3 chars
             # for the code, or do a split on ':'
-            logging.error("HTTP error: " + response.reason)
+            self.logger.error("HTTP error: %s", response.reason)
             if self.method not in ('GET', 'HEAD'):
                 # The sensor uses GET, so this prevents filling up the log
                 # with the body every time the GET 'misses'.
                 # That's ok to do, because GETs should be repeatable and
                 # all data should be visible in the log (no post data)
-                logging.error(response.text)
+                self.logger.error(response.text)
             raise 
AirflowException(str(response.status_code)+":"+response.reason)
         return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/oracle_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py
index 1386d6d..f439daa 100644
--- a/airflow/hooks/oracle_hook.py
+++ b/airflow/hooks/oracle_hook.py
@@ -19,7 +19,6 @@ from builtins import str
 from past.builtins import basestring
 from datetime import datetime
 import numpy
-import logging
 
 
 class OracleHook(DbApiHook):
@@ -102,11 +101,11 @@ class OracleHook(DbApiHook):
             cur.execute(sql)
             if i % commit_every == 0:
                 conn.commit()
-                logging.info('Loaded {i} into {table} rows so 
far'.format(**locals()))
+                self.logger.info('Loaded {i} into {table} rows so 
far'.format(**locals()))
         conn.commit()
         cur.close()
         conn.close()
-        logging.info('Done loading. Loaded a total of {i} 
rows'.format(**locals()))
+        self.logger.info('Done loading. Loaded a total of {i} 
rows'.format(**locals()))
 
     def bulk_insert_rows(self, table, rows, target_fields=None, 
commit_every=5000):
         """A performant bulk insert for cx_Oracle that uses prepared 
statements via `executemany()`.
@@ -130,13 +129,13 @@ class OracleHook(DbApiHook):
                 cursor.prepare(prepared_stm)
                 cursor.executemany(None, row_chunk)
                 conn.commit()
-                logging.info('[%s] inserted %s rows', table, row_count)
+                self.logger.info('[%s] inserted %s rows', table, row_count)
                 # Empty chunk
                 row_chunk = []
         # Commit the leftover chunk
         cursor.prepare(prepared_stm)
         cursor.executemany(None, row_chunk)
         conn.commit()
-        logging.info('[%s] inserted %s rows', table, row_count)
+        self.logger.info('[%s] inserted %s rows', table, row_count)
         cursor.close()
         conn.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/pig_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py
index 7201b9f..29beb54 100644
--- a/airflow/hooks/pig_hook.py
+++ b/airflow/hooks/pig_hook.py
@@ -13,14 +13,12 @@
 # limitations under the License.
 
 from __future__ import print_function
-import logging
 import subprocess
 from tempfile import NamedTemporaryFile
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.utils.file import TemporaryDirectory
-from airflow import configuration
 
 
 class PigCliHook(BaseHook):
@@ -64,7 +62,7 @@ class PigCliHook(BaseHook):
                     pig_properties_list = self.pig_properties.split()
                     pig_cmd.extend(pig_properties_list)
                 if verbose:
-                    logging.info(" ".join(pig_cmd))
+                    self.logger.info(" ".join(pig_cmd))
                 sp = subprocess.Popen(
                     pig_cmd,
                     stdout=subprocess.PIPE,
@@ -75,7 +73,7 @@ class PigCliHook(BaseHook):
                 for line in iter(sp.stdout.readline, ''):
                     stdout += line
                     if verbose:
-                        logging.info(line.strip())
+                        self.logger.info(line.strip())
                 sp.wait()
 
                 if sp.returncode:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/presto_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/presto_hook.py b/airflow/hooks/presto_hook.py
index 768ff3f..b9f30a6 100644
--- a/airflow/hooks/presto_hook.py
+++ b/airflow/hooks/presto_hook.py
@@ -13,16 +13,12 @@
 # limitations under the License.
 
 from builtins import str
-import logging
 
 from pyhive import presto
 from pyhive.exc import DatabaseError
 
 from airflow.hooks.dbapi_hook import DbApiHook
 
-logging.getLogger("pyhive").setLevel(logging.INFO)
-
-
 class PrestoException(Exception):
     pass
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/webhdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index 5e2a28d..e7df328 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -14,16 +14,18 @@
 
 from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
-import logging
 
 from hdfs import InsecureClient, HdfsError
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 _kerberos_security_mode = configuration.get("core", "security") == "kerberos"
 if _kerberos_security_mode:
     try:
         from hdfs.ext.kerberos import KerberosClient
     except ImportError:
-        logging.error("Could not load the Kerberos extension for the 
WebHDFSHook.")
+        log = LoggingMixin().logger
+        log.error("Could not load the Kerberos extension for the WebHDFSHook.")
         raise
 from airflow.exceptions import AirflowException
 
@@ -47,7 +49,7 @@ class WebHDFSHook(BaseHook):
         nn_connections = self.get_connections(self.webhdfs_conn_id)
         for nn in nn_connections:
             try:
-                logging.debug('Trying namenode {}'.format(nn.host))
+                self.logger.debug('Trying namenode %s', nn.host)
                 connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn)
                 if _kerberos_security_mode:
                     client = KerberosClient(connection_str)
@@ -55,11 +57,12 @@ class WebHDFSHook(BaseHook):
                     proxy_user = self.proxy_user or nn.login
                     client = InsecureClient(connection_str, user=proxy_user)
                 client.status('/')
-                logging.debug('Using namenode {} for hook'.format(nn.host))
+                self.logger.debug('Using namenode %s for hook', nn.host)
                 return client
             except HdfsError as e:
-                logging.debug("Read operation on namenode {nn.host} failed 
with"
-                              " error: {e.message}".format(**locals()))
+                self.logger.debug(
+                    "Read operation on namenode {nn.host} failed witg error: 
{e.message}".format(**locals())
+                )
         nn_hosts = [c.host for c in nn_connections]
         no_nn_error = "Read operations failed on the namenodes 
below:\n{}".format("\n".join(nn_hosts))
         raise AirflowWebHDFSHookException(no_nn_error)
@@ -98,4 +101,4 @@ class WebHDFSHook(BaseHook):
                  overwrite=overwrite,
                  n_threads=parallelism,
                  **kwargs)
-        logging.debug("Uploaded file {} to {}".format(source, destination))
+        self.logger.debug("Uploaded file %s to %s", source, destination)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py
index 907d1e8..4634b22 100644
--- a/airflow/hooks/zendesk_hook.py
+++ b/airflow/hooks/zendesk_hook.py
@@ -12,19 +12,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-
-"""
-A hook to talk to Zendesk
-"""
-
-import logging
 import time
 from zdesk import Zendesk, RateLimitError, ZendeskError
 from airflow.hooks.base_hook import BaseHook
 
 
 class ZendeskHook(BaseHook):
+    """
+    A hook to talk to Zendesk
+    """
     def __init__(self, zendesk_conn_id):
         self.__zendesk_conn_id = zendesk_conn_id
         self.__url = None
@@ -41,10 +37,10 @@ class ZendeskHook(BaseHook):
         """
         retry_after = int(
             rate_limit_exception.response.headers.get('Retry-After', 60))
-        logging.info(
-            "Hit Zendesk API rate limit. Pausing for {} "
-            "seconds".format(
-                retry_after))
+        self.logger.info(
+            "Hit Zendesk API rate limit. Pausing for %s seconds",
+            retry_after
+        )
         time.sleep(retry_after)
 
     def call(self, path, query=None, get_all_pages=True):
@@ -79,7 +75,7 @@ class ZendeskHook(BaseHook):
                     # `github.zendesk...`
                     # in it, but the call function needs it removed.
                     next_url = next_page.split(self.__url)[1]
-                    logging.info("Calling {}".format(next_url))
+                    self.logger.info("Calling %s", next_url)
                     more_res = zendesk.call(next_url)
                     results.extend(more_res[key])
                     if next_page == more_res['next_page']:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 138a055..904609c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -17,48 +17,43 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
-from past.builtins import basestring
-from collections import defaultdict, Counter
-
-from datetime import datetime
-
 import getpass
-import logging
-import socket
 import multiprocessing
 import os
+import psutil
 import signal
 import six
+import socket
 import sys
 import threading
 import time
-from time import sleep
-
-import psutil
+from collections import defaultdict
+from datetime import datetime
+from past.builtins import basestring
 from sqlalchemy import (
     Column, Integer, String, DateTime, func, Index, or_, and_, not_)
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import make_transient
 from tabulate import tabulate
+from time import sleep
 
-from airflow import executors, models, settings
 from airflow import configuration as conf
+from airflow import executors, models, settings
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun
 from airflow.settings import Stats
 from airflow.task_runner import get_task_runner
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
-from airflow.utils.state import State
-from airflow.utils.db import provide_session, pessimistic_connection_handling
+from airflow.utils import asciiart
 from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           DagFileProcessorManager,
                                           SimpleDag,
                                           SimpleDagBag,
                                           list_py_file_paths)
+from airflow.utils.db import provide_session, pessimistic_connection_handling
 from airflow.utils.email import send_email
-from airflow.utils.logging import LoggingMixin
-from airflow.utils import asciiart
-
+from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.state import State
 
 Base = models.Base
 ID_LEN = models.ID_LEN
@@ -276,12 +271,14 @@ class BaseJob(Base, LoggingMixin):
             ["{}".format(x) for x in reset_tis])
         session.commit()
 
-        self.logger.info("Reset the following {} TaskInstances:\n\t{}"
-                         .format(len(reset_tis), task_instance_str))
+        self.logger.info(
+            "Reset the following %s TaskInstances:\n\t%s",
+            len(reset_tis), task_instance_str
+        )
         return reset_tis
 
 
-class DagFileProcessor(AbstractDagFileProcessor):
+class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
     """Helps call SchedulerJob.process_file() in a separate process."""
 
     # Counter that increments everytime an instance of this class is created
@@ -361,6 +358,8 @@ class DagFileProcessor(AbstractDagFileProcessor):
             # responsive file tailing
             parent_dir, _ = os.path.split(log_file)
 
+            _log = LoggingMixin().logger
+
             # Create the parent directory for the log file if necessary.
             if not os.path.isdir(parent_dir):
                 os.makedirs(parent_dir)
@@ -385,7 +384,7 @@ class DagFileProcessor(AbstractDagFileProcessor):
                 threading.current_thread().name = thread_name
                 start_time = time.time()
 
-                logging.info("Started process (PID=%s) to work on %s",
+                _log.info("Started process (PID=%s) to work on %s",
                              os.getpid(),
                              file_path)
                 scheduler_job = SchedulerJob(dag_ids=dag_id_white_list)
@@ -393,12 +392,13 @@ class DagFileProcessor(AbstractDagFileProcessor):
                                                     pickle_dags)
                 result_queue.put(result)
                 end_time = time.time()
-                logging.info("Processing %s took %.3f seconds",
-                             file_path,
-                             end_time - start_time)
+                _log.info(
+                    "Processing %s took %.3f seconds",
+                    file_path, end_time - start_time
+                )
             except:
                 # Log exceptions through the logging framework.
-                logging.exception("Got an exception! Propagating...")
+                _log.exception("Got an exception! Propagating...")
                 raise
             finally:
                 sys.stdout = original_stdout
@@ -438,7 +438,7 @@ class DagFileProcessor(AbstractDagFileProcessor):
         # Arbitrarily wait 5s for the process to die
         self._process.join(5)
         if sigkill and self._process.is_alive():
-            logging.warning("Killing PID %s", self._process.pid)
+            self.logger.warning("Killing PID %s", self._process.pid)
             os.kill(self._process.pid, signal.SIGKILL)
 
     @property
@@ -478,7 +478,7 @@ class DagFileProcessor(AbstractDagFileProcessor):
         if not self._result_queue.empty():
             self._result = self._result_queue.get_nowait()
             self._done = True
-            logging.debug("Waiting for %s", self._process)
+            self.logger.debug("Waiting for %s", self._process)
             self._process.join()
             return True
 
@@ -488,7 +488,7 @@ class DagFileProcessor(AbstractDagFileProcessor):
             # Get the object from the queue or else join() can hang.
             if not self._result_queue.empty():
                 self._result = self._result_queue.get_nowait()
-            logging.debug("Waiting for %s", self._process)
+            self.logger.debug("Waiting for %s", self._process)
             self._process.join()
             return True
 
@@ -610,8 +610,10 @@ class SchedulerJob(BaseJob):
         tasks that should have succeeded in the past hour.
         """
         if not any([ti.sla for ti in dag.tasks]):
-            self.logger.info("Skipping SLA check for {} because "
-                             "no tasks in DAG have SLAs".format(dag))
+            self.logger.info(
+                "Skipping SLA check for %s because no tasks in DAG have SLAs",
+                dag
+            )
             return
 
         TI = models.TaskInstance
@@ -841,8 +843,10 @@ class SchedulerJob(BaseJob):
                 task_start_dates = [t.start_date for t in dag.tasks]
                 if task_start_dates:
                     next_run_date = 
dag.normalize_schedule(min(task_start_dates))
-                    self.logger.debug("Next run date based on tasks {}"
-                                      .format(next_run_date))
+                    self.logger.debug(
+                        "Next run date based on tasks %s",
+                        next_run_date
+                    )
             else:
                 next_run_date = dag.following_schedule(last_scheduled_run)
 
@@ -859,8 +863,10 @@ class SchedulerJob(BaseJob):
                 if next_run_date == dag.start_date:
                     next_run_date = dag.normalize_schedule(dag.start_date)
 
-                self.logger.debug("Dag start date: {}. Next run date: {}"
-                                  .format(dag.start_date, next_run_date))
+                self.logger.debug(
+                    "Dag start date: %s. Next run date: %s",
+                    dag.start_date, next_run_date
+                )
 
             # don't ever schedule in the future
             if next_run_date > datetime.now():
@@ -908,11 +914,13 @@ class SchedulerJob(BaseJob):
         dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, 
session=session)
         active_dag_runs = []
         for run in dag_runs:
-            self.logger.info("Examining DAG run {}".format(run))
+            self.logger.info("Examining DAG run %s", run)
             # don't consider runs that are executed in the future
             if run.execution_date > datetime.now():
-                self.logger.error("Execution date is in future: {}"
-                                  .format(run.execution_date))
+                self.logger.error(
+                    "Execution date is in future: %s",
+                    run.execution_date
+                )
                 continue
 
             if len(active_dag_runs) >= dag.max_active_runs:
@@ -933,7 +941,7 @@ class SchedulerJob(BaseJob):
                 active_dag_runs.append(run)
 
         for run in active_dag_runs:
-            self.logger.debug("Examining active DAG run {}".format(run))
+            self.logger.debug("Examining active DAG run: %s", run)
             # this needs a fresh session sometimes tis get detached
             tis = run.get_task_instances(state=(State.NONE,
                                                 State.UP_FOR_RETRY))
@@ -954,7 +962,7 @@ class SchedulerJob(BaseJob):
                 if ti.are_dependencies_met(
                         dep_context=DepContext(flag_upstream_failed=True),
                         session=session):
-                    self.logger.debug('Queuing task: {}'.format(ti))
+                    self.logger.debug('Queuing task: %s', ti)
                     queue.append(ti.key)
 
         session.close()
@@ -1012,9 +1020,10 @@ class SchedulerJob(BaseJob):
             session.commit()
 
         if tis_changed > 0:
-            self.logger.warning("Set {} task instances to state={} as their 
associated "
-                                "DagRun was not in RUNNING state".format(
-                tis_changed, new_state))
+            self.logger.warning(
+                "Set %s task instances to state=%s as their associated DagRun 
was not in RUNNING state",
+                tis_changed, new_state
+            )
 
     @provide_session
     def _find_executable_task_instances(self, simple_dag_bag, states, 
session=None):
@@ -1066,7 +1075,7 @@ class SchedulerJob(BaseJob):
         # Put one task instance on each line
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in task_instances_to_examine])
-        self.logger.info("Tasks up for 
execution:\n\t{}".format(task_instance_str))
+        self.logger.info("Tasks up for execution:\n\t%s", task_instance_str)
 
         # Get the pool settings
         pools = {p.pool: p for p in session.query(models.Pool).all()}
@@ -1087,9 +1096,12 @@ class SchedulerJob(BaseJob):
                 open_slots = pools[pool].open_slots(session=session)
 
             num_queued = len(task_instances)
-            self.logger.info("Figuring out tasks to run in Pool(name={pool}) "
-                             "with {open_slots} open slots and {num_queued} "
-                             "task instances in queue".format(**locals()))
+            self.logger.info(
+                "Figuring out tasks to run in Pool(name={pool}) with 
{open_slots} "
+                "open slots and {num_queued} task instances in queue".format(
+                    **locals()
+                )
+            )
 
             priority_sorted_task_instances = sorted(
                 task_instances, key=lambda ti: (-ti.priority_weight, 
ti.execution_date))
@@ -1099,8 +1111,10 @@ class SchedulerJob(BaseJob):
 
             for task_instance in priority_sorted_task_instances:
                 if open_slots <= 0:
-                    self.logger.info("Not scheduling since there are {} open 
slots in pool {}"
-                        .format(open_slots, pool))
+                    self.logger.info(
+                        "Not scheduling since there are %s open slots in pool 
%s",
+                        open_slots, pool
+                    )
                     # Can't schedule any more since there are no more open 
slots.
                     break
 
@@ -1119,25 +1133,23 @@ class SchedulerJob(BaseJob):
 
                 current_task_concurrency = 
dag_id_to_possibly_running_task_count[dag_id]
                 task_concurrency_limit = 
simple_dag_bag.get_dag(dag_id).concurrency
-                self.logger.info("DAG {} has {}/{} running and queued tasks"
-                                 .format(dag_id,
-                                         current_task_concurrency,
-                                         task_concurrency_limit))
+                self.logger.info(
+                    "DAG %s has %s/%s running and queued tasks",
+                    dag_id, current_task_concurrency, task_concurrency_limit
+                )
                 if current_task_concurrency >= task_concurrency_limit:
-                    self.logger.info("Not executing {} since the number "
-                                     "of tasks running or queued from DAG {}"
-                                     " is >= to the "
-                                     "DAG's task concurrency limit of {}"
-                                     .format(task_instance,
-                                             dag_id,
-                                             task_concurrency_limit))
+                    self.logger.info(
+                        "Not executing %s since the number of tasks running or 
queued from DAG %s"
+                        " is >= to the DAG's task concurrency limit of %s",
+                        task_instance, dag_id, task_concurrency_limit
+                    )
                     continue
 
-
                 if self.executor.has_task(task_instance):
-                    self.logger.debug(("Not handling task {} as the executor " 
+
-                                      "reports it is running")
-                                      .format(task_instance.key))
+                    self.logger.debug(
+                        "Not handling task %s as the executor reports it is 
running",
+                        task_instance.key
+                    )
                     continue
                 executable_tis.append(task_instance)
                 open_slots -= 1
@@ -1145,8 +1157,7 @@ class SchedulerJob(BaseJob):
 
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in executable_tis])
-        self.logger.info("Setting the follow tasks to queued state:\n\t{}"
-                         .format(task_instance_str))
+        self.logger.info("Setting the follow tasks to queued state:\n\t%s", 
task_instance_str)
         # so these dont expire on commit
         for ti in executable_tis:
             copy_dag_id = ti.dag_id
@@ -1225,8 +1236,7 @@ class SchedulerJob(BaseJob):
 
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in tis_to_be_queued])
-        self.logger.info("Setting the follow tasks to queued state:\n\t{}"
-                         .format(task_instance_str))
+        self.logger.info("Setting the follow tasks to queued state:\n\t%s", 
task_instance_str)
         return tis_to_be_queued
 
     def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, 
task_instances):
@@ -1258,8 +1268,10 @@ class SchedulerJob(BaseJob):
 
             priority = task_instance.priority_weight
             queue = task_instance.queue
-            self.logger.info("Sending {} to executor with priority {} and 
queue {}"
-                             .format(task_instance.key, priority, queue))
+            self.logger.info(
+                "Sending %s to executor with priority %s and queue %s",
+                task_instance.key, priority, queue
+            )
 
             # save attributes so sqlalchemy doesnt expire them
             copy_dag_id = task_instance.dag_id
@@ -1345,20 +1357,18 @@ class SchedulerJob(BaseJob):
         for dag in dags:
             dag = dagbag.get_dag(dag.dag_id)
             if dag.is_paused:
-                self.logger.info("Not processing DAG {} since it's paused"
-                                 .format(dag.dag_id))
+                self.logger.info("Not processing DAG %s since it's paused", 
dag.dag_id)
                 continue
 
             if not dag:
-                self.logger.error("DAG ID {} was not found in the DagBag"
-                                  .format(dag.dag_id))
+                self.logger.error("DAG ID %s was not found in the DagBag", 
dag.dag_id)
                 continue
 
-            self.logger.info("Processing {}".format(dag.dag_id))
+            self.logger.info("Processing %s", dag.dag_id)
 
             dag_run = self.create_dag_run(dag)
             if dag_run:
-                self.logger.info("Created {}".format(dag_run))
+                self.logger.info("Created %s", dag_run)
             self._process_task_instances(dag, tis_out)
             self.manage_slas(dag)
 
@@ -1374,11 +1384,10 @@ class SchedulerJob(BaseJob):
         """
         for key, executor_state in 
list(self.executor.get_event_buffer().items()):
             dag_id, task_id, execution_date = key
-            self.logger.info("Executor reports {}.{} execution_date={} as {}"
-                             .format(dag_id,
-                                     task_id,
-                                     execution_date,
-                                     executor_state))
+            self.logger.info(
+                "Executor reports %s.%s execution_date=%s as %s",
+                dag_id, task_id, execution_date, executor_state
+            )
 
     def _log_file_processing_stats(self,
                                    known_file_paths,
@@ -1450,8 +1459,6 @@ class SchedulerJob(BaseJob):
         self.logger.info("Starting the scheduler")
         pessimistic_connection_handling()
 
-        logging.basicConfig(level=logging.DEBUG)
-
         # DAGs can be pickled for easier remote execution by some executors
         pickle_dags = False
         if self.do_pickle and self.executor.__class__ not in \
@@ -1462,22 +1469,16 @@ class SchedulerJob(BaseJob):
         # DAGs in parallel. By processing them in separate processes,
         # we can get parallelism and isolation from potentially harmful
         # user code.
-        self.logger.info("Processing files using up to {} processes at a time "
-                         .format(self.max_threads))
-        self.logger.info("Running execute loop for {} seconds"
-                         .format(self.run_duration))
-        self.logger.info("Processing each file at most {} times"
-                         .format(self.num_runs))
-        self.logger.info("Process each file at most once every {} seconds"
-                         .format(self.file_process_interval))
-        self.logger.info("Checking for new files in {} every {} seconds"
-                         .format(self.subdir, self.dag_dir_list_interval))
+        self.logger.info("Processing files using up to %s processes at a 
time", self.max_threads)
+        self.logger.info("Running execute loop for %s seconds", 
self.run_duration)
+        self.logger.info("Processing each file at most %s times", 
self.num_runs)
+        self.logger.info("Process each file at most once every %s seconds", 
self.file_process_interval)
+        self.logger.info("Checking for new files in %s every %s seconds", 
self.subdir, self.dag_dir_list_interval)
 
         # Build up a list of Python files that could contain DAGs
-        self.logger.info("Searching for files in {}".format(self.subdir))
+        self.logger.info("Searching for files in %s", self.subdir)
         known_file_paths = list_py_file_paths(self.subdir)
-        self.logger.info("There are {} files in {}"
-                         .format(len(known_file_paths), self.subdir))
+        self.logger.info("There are %s files in %s", len(known_file_paths), 
self.subdir)
 
         def processor_factory(file_path, log_file_path):
             return DagFileProcessor(file_path,
@@ -1510,23 +1511,22 @@ class SchedulerJob(BaseJob):
                 child_processes = [x for x in 
this_process.children(recursive=True)
                                    if x.is_running() and x.pid in pids_to_kill]
                 for child in child_processes:
-                    self.logger.info("Terminating child PID: 
{}".format(child.pid))
+                    self.logger.info("Terminating child PID: %s", child.pid)
                     child.terminate()
+                # TODO: Remove magic number
                 timeout = 5
-                self.logger.info("Waiting up to {}s for processes to exit..."
-                                 .format(timeout))
+                self.logger.info("Waiting up to %s seconds for processes to 
exit...", timeout)
                 try:
                     psutil.wait_procs(child_processes, timeout)
                 except psutil.TimeoutExpired:
-                    self.logger.debug("Ran out of time while waiting for "
-                                      "processes to exit")
+                    self.logger.debug("Ran out of time while waiting for 
processes to exit")
 
                 # Then SIGKILL
                 child_processes = [x for x in 
this_process.children(recursive=True)
                                    if x.is_running() and x.pid in pids_to_kill]
                 if len(child_processes) > 0:
                     for child in child_processes:
-                        self.logger.info("Killing child PID: 
{}".format(child.pid))
+                        self.logger.info("Killing child PID: %s", child.pid)
                         child.kill()
                         child.wait()
 
@@ -1568,11 +1568,10 @@ class SchedulerJob(BaseJob):
 
             if elapsed_time_since_refresh > self.dag_dir_list_interval:
                 # Build up a list of Python files that could contain DAGs
-                self.logger.info("Searching for files in 
{}".format(self.subdir))
+                self.logger.info("Searching for files in %s", self.subdir)
                 known_file_paths = list_py_file_paths(self.subdir)
                 last_dag_dir_refresh_time = datetime.now()
-                self.logger.info("There are {} files in {}"
-                                 .format(len(known_file_paths), self.subdir))
+                self.logger.info("There are %s files in %s", 
len(known_file_paths), self.subdir)
                 processor_manager.set_file_paths(known_file_paths)
 
                 self.logger.debug("Removing old import errors")
@@ -1585,8 +1584,7 @@ class SchedulerJob(BaseJob):
             if self.using_sqlite:
                 # For the sqlite case w/ 1 thread, wait until the processor
                 # is finished to avoid concurrent access to the DB.
-                self.logger.debug("Waiting for processors to finish since 
we're "
-                                  "using sqlite")
+                self.logger.debug("Waiting for processors to finish since 
we're using sqlite")
                 processor_manager.wait_until_finished()
 
             # Send tasks for execution if available
@@ -1638,16 +1636,13 @@ class SchedulerJob(BaseJob):
                 last_stat_print_time = datetime.now()
 
             loop_end_time = time.time()
-            self.logger.debug("Ran scheduling loop in {:.2f}s"
-                              .format(loop_end_time - loop_start_time))
-            self.logger.debug("Sleeping for {:.2f}s"
-                              .format(self._processor_poll_interval))
+            self.logger.debug("Ran scheduling loop in %.2f seconds", 
loop_end_time - loop_start_time)
+            self.logger.debug("Sleeping for %.2f seconds", 
self._processor_poll_interval)
             time.sleep(self._processor_poll_interval)
 
             # Exit early for a test mode
             if processor_manager.max_runs_reached():
-                self.logger.info("Exiting loop as all files have been 
processed "
-                                 "{} times".format(self.num_runs))
+                self.logger.info("Exiting loop as all files have been 
processed %s times", self.num_runs)
                 break
 
         # Stop any processors
@@ -1662,8 +1657,10 @@ class SchedulerJob(BaseJob):
                 all_files_processed = False
                 break
         if all_files_processed:
-            self.logger.info("Deactivating DAGs that haven't been touched 
since {}"
-                             .format(execute_start_time.isoformat()))
+            self.logger.info(
+                "Deactivating DAGs that haven't been touched since %s",
+                execute_start_time.isoformat()
+            )
             models.DAG.deactivate_stale_dags(execute_start_time)
 
         self.executor.end()
@@ -1696,23 +1693,21 @@ class SchedulerJob(BaseJob):
         :return: a list of SimpleDags made from the Dags found in the file
         :rtype: list[SimpleDag]
         """
-        self.logger.info("Processing file {} for tasks to 
queue".format(file_path))
+        self.logger.info("Processing file %s for tasks to queue", file_path)
         # As DAGs are parsed from this file, they will be converted into 
SimpleDags
         simple_dags = []
 
         try:
             dagbag = models.DagBag(file_path)
         except Exception:
-            self.logger.exception("Failed at reloading the DAG file 
{}".format(file_path))
+            self.logger.exception("Failed at reloading the DAG file %s", 
file_path)
             Stats.incr('dag_file_refresh_error', 1, 1)
             return []
 
         if len(dagbag.dags) > 0:
-            self.logger.info("DAG(s) {} retrieved from {}"
-                             .format(dagbag.dags.keys(),
-                                     file_path))
+            self.logger.info("DAG(s) %s retrieved from %s", 
dagbag.dags.keys(), file_path)
         else:
-            self.logger.warning("No viable dags retrieved from 
{}".format(file_path))
+            self.logger.warning("No viable dags retrieved from %s", file_path)
             self.update_import_errors(session, dagbag)
             return []
 
@@ -1783,7 +1778,7 @@ class SchedulerJob(BaseJob):
                 ti.state = State.SCHEDULED
 
             # Also save this task instance to the DB.
-            self.logger.info("Creating / updating {} in ORM".format(ti))
+            self.logger.info("Creating / updating %s in ORM", ti)
             session.merge(ti)
             session.commit()
 
@@ -1914,25 +1909,22 @@ class BackfillJob(BaseJob):
             ti.refresh_from_db()
             if ti.state == State.SUCCESS:
                 ti_status.succeeded.add(key)
-                self.logger.debug("Task instance {} succeeded. "
-                                  "Don't rerun.".format(ti))
+                self.logger.debug("Task instance %s succeeded. Don't rerun.", 
ti)
                 ti_status.started.pop(key)
                 continue
             elif ti.state == State.SKIPPED:
                 ti_status.skipped.add(key)
-                self.logger.debug("Task instance {} skipped. "
-                                  "Don't rerun.".format(ti))
+                self.logger.debug("Task instance %s skipped. Don't rerun.", ti)
                 ti_status.started.pop(key)
                 continue
             elif ti.state == State.FAILED:
-                self.logger.error("Task instance {} failed".format(ti))
+                self.logger.error("Task instance %s failed", ti)
                 ti_status.failed.add(key)
                 ti_status.started.pop(key)
                 continue
             # special case: if the task needs to run again put it back
             elif ti.state == State.UP_FOR_RETRY:
-                self.logger.warning("Task instance {} is up for retry"
-                                    .format(ti))
+                self.logger.warning("Task instance %s is up for retry", ti)
                 ti_status.started.pop(key)
                 ti_status.to_run[key] = ti
             # special case: The state of the task can be set to NONE by the 
task itself
@@ -1941,9 +1933,11 @@ class BackfillJob(BaseJob):
             # for that as otherwise those tasks would fall outside of the 
scope of
             # the backfill suddenly.
             elif ti.state == State.NONE:
-                self.logger.warning("FIXME: task instance {} state was set to "
-                                    "None externally or reaching concurrency 
limits. "
-                                    "Re-adding task to queue.".format(ti))
+                self.logger.warning(
+                    "FIXME: task instance %s state was set to none externally 
or "
+                    "reaching concurrency limits. Re-adding task to queue.",
+                    ti
+                )
                 session = settings.Session()
                 ti.set_state(State.SCHEDULED, session=session)
                 session.close()
@@ -1960,14 +1954,16 @@ class BackfillJob(BaseJob):
 
         for key, state in list(executor.get_event_buffer().items()):
             if key not in started:
-                self.logger.warning("{} state {} not in started={}"
-                                    .format(key, state, started.values()))
+                self.logger.warning(
+                    "%s state %s not in started=%s",
+                    key, state, started.values()
+                )
                 continue
 
             ti = started[key]
             ti.refresh_from_db()
 
-            self.logger.debug("Executor state: {} task {}".format(state, ti))
+            self.logger.debug("Executor state: %s task %s", state, ti)
 
             if state == State.FAILED or state == State.SUCCESS:
                 if ti.state == State.RUNNING or ti.state == State.QUEUED:
@@ -2090,9 +2086,10 @@ class BackfillJob(BaseJob):
             len(ti_status.not_ready))
         self.logger.info(msg)
 
-        self.logger.debug("Finished dag run loop iteration. "
-                          "Remaining tasks {}"
-                          .format(ti_status.to_run.values()))
+        self.logger.debug(
+            "Finished dag run loop iteration. Remaining tasks %s",
+            ti_status.to_run.values()
+        )
 
     @provide_session
     def _process_backfill_task_instances(self,
@@ -2142,43 +2139,41 @@ class BackfillJob(BaseJob):
                     ignore_depends_on_past = (
                         self.ignore_first_depends_on_past and
                         ti.execution_date == (start_date or ti.start_date))
-                    self.logger.debug("Task instance to run {} state {}"
-                                      .format(ti, ti.state))
+                    self.logger.debug("Task instance to run %s state %s", ti, 
ti.state)
 
                     # guard against externally modified tasks instances or
                     # in case max concurrency has been reached at task runtime
                     if ti.state == State.NONE:
-                        self.logger.warning("FIXME: task instance {} state was 
set to "
-                                            "None externally. This should not 
happen")
+                        self.logger.warning(
+                            "FIXME: task instance {} state was set to None 
externally. This should not happen"
+                        )
                         ti.set_state(State.SCHEDULED, session=session)
 
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
                         ti_status.succeeded.add(key)
-                        self.logger.debug("Task instance {} succeeded. "
-                                          "Don't rerun.".format(ti))
+                        self.logger.debug("Task instance %s succeeded. Don't 
rerun.", ti)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.SKIPPED:
                         ti_status.skipped.add(key)
-                        self.logger.debug("Task instance {} skipped. "
-                                          "Don't rerun.".format(ti))
+                        self.logger.debug("Task instance %s skipped. Don't 
rerun.", ti)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.FAILED:
-                        self.logger.error("Task instance {} failed".format(ti))
+                        self.logger.error("Task instance %s failed", ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.UPSTREAM_FAILED:
-                        self.logger.error("Task instance {} upstream 
failed".format(ti))
+                        self.logger.error("Task instance %s upstream failed", 
ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
@@ -2200,10 +2195,12 @@ class BackfillJob(BaseJob):
                         ti.refresh_from_db(lock_for_update=True, 
session=session)
                         if ti.state == State.SCHEDULED or ti.state == 
State.UP_FOR_RETRY:
                             if executor.has_task(ti):
-                                self.logger.debug("Task Instance {} already in 
executor "
-                                                  "waiting for queue to 
clear".format(ti))
+                                self.logger.debug(
+                                    "Task Instance %s already in executor 
waiting for queue to clear",
+                                    ti
+                                )
                             else:
-                                self.logger.debug('Sending {} to 
executor'.format(ti))
+                                self.logger.debug('Sending %s to executor', ti)
                                 # Skip scheduled state, we are executing 
immediately
                                 ti.state = State.QUEUED
                                 session.merge(ti)
@@ -2220,7 +2217,7 @@ class BackfillJob(BaseJob):
                         continue
 
                     if ti.state == State.UPSTREAM_FAILED:
-                        self.logger.error("Task instance {} upstream 
failed".format(ti))
+                        self.logger.error("Task instance %s upstream failed", 
ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
@@ -2229,15 +2226,14 @@ class BackfillJob(BaseJob):
 
                     # special case
                     if ti.state == State.UP_FOR_RETRY:
-                        self.logger.debug("Task instance {} retry period not 
expired yet"
-                                          .format(ti))
+                        self.logger.debug("Task instance %s retry period not 
expired yet", ti)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         ti_status.to_run[key] = ti
                         continue
 
                     # all remaining tasks
-                    self.logger.debug('Adding {} to not_ready'.format(ti))
+                    self.logger.debug('Adding %s to not_ready', ti)
                     ti_status.not_ready.add(key)
 
             # execute the tasks in the queue
@@ -2250,8 +2246,10 @@ class BackfillJob(BaseJob):
             if (ti_status.not_ready and
                     ti_status.not_ready == set(ti_status.to_run) and
                     len(ti_status.started) == 0):
-                self.logger.warning("Deadlock discovered for 
ti_status.to_run={}"
-                                    .format(ti_status.to_run.values()))
+                self.logger.warning(
+                    "Deadlock discovered for ti_status.to_run=%s",
+                    ti_status.to_run.values()
+                )
                 ti_status.deadlocked.update(ti_status.to_run.values())
                 ti_status.to_run.clear()
 
@@ -2284,7 +2282,7 @@ class BackfillJob(BaseJob):
         if ti_status.failed:
             err += (
                 "---------------------------------------------------\n"
-                "Some task instances failed:\n{}\n".format(ti_status.failed))
+                "Some task instances failed:\n%s\n".format(ti_status.failed))
         if ti_status.deadlocked:
             err += (
                 '---------------------------------------------------\n'
@@ -2367,8 +2365,7 @@ class BackfillJob(BaseJob):
         run_dates = self.dag.get_run_dates(start_date=start_date,
                                            end_date=self.bf_end_date)
         if len(run_dates) == 0:
-            self.logger.info("No run dates were found for the given dates and 
dag "
-                             "interval.")
+            self.logger.info("No run dates were found for the given dates and 
dag interval.")
             return
 
         # picklin'
@@ -2406,9 +2403,11 @@ class BackfillJob(BaseJob):
                     raise AirflowException(err)
 
                 if remaining_dates > 0:
-                    self.logger.info(("max_active_runs limit for dag {} has 
been reached "
-                                     " - waiting for other dag runs to finish")
-                                     .format(self.dag_id))
+                    self.logger.info(
+                        "max_active_runs limit for dag %s has been reached "
+                        " - waiting for other dag runs to finish",
+                        self.dag_id
+                    )
                     time.sleep(self.delay_on_limit_secs)
         finally:
             executor.end()
@@ -2454,8 +2453,8 @@ class LocalTaskJob(BaseJob):
         self.task_runner = get_task_runner(self)
 
         def signal_handler(signum, frame):
-            '''Setting kill signal handler'''
-            logging.error("Killing subprocess")
+            """Setting kill signal handler"""
+            self.logger.error("Killing subprocess")
             self.on_kill()
             raise AirflowException("LocalTaskJob received SIGTERM signal")
         signal.signal(signal.SIGTERM, signal_handler)
@@ -2468,8 +2467,8 @@ class LocalTaskJob(BaseJob):
                 ignore_ti_state=self.ignore_ti_state,
                 job_id=self.id,
                 pool=self.pool):
-            self.logger.info("Task is not able to be run") 
-            return 
+            self.logger.info("Task is not able to be run")
+            return
 
         try:
             self.task_runner.start()
@@ -2481,8 +2480,7 @@ class LocalTaskJob(BaseJob):
                 # Monitor the task to see if it's done
                 return_code = self.task_runner.return_code()
                 if return_code is not None:
-                    self.logger.info("Task exited with return code {}"
-                                     .format(return_code))
+                    self.logger.info("Task exited with return code %s", 
return_code)
                     return
 
                 # Periodically heartbeat so that the scheduler doesn't think 
this
@@ -2492,8 +2490,10 @@ class LocalTaskJob(BaseJob):
                     last_heartbeat_time = time.time()
                 except OperationalError:
                     Stats.incr('local_task_job_heartbeat_failure', 1, 1)
-                    self.logger.exception("Exception while trying to 
heartbeat! "
-                                          "Sleeping for 
{}s".format(self.heartrate))
+                    self.logger.exception(
+                        "Exception while trying to heartbeat! Sleeping for %s 
seconds",
+                        self.heartrate
+                    )
                     time.sleep(self.heartrate)
 
                 # If it's been too long since we've heartbeat, then it's 
possible that
@@ -2531,19 +2531,20 @@ class LocalTaskJob(BaseJob):
 
         if ti.state == State.RUNNING:
             if not same_hostname:
-                logging.warning("The recorded hostname {ti.hostname} "
+                self.logger.warning("The recorded hostname {ti.hostname} "
                                 "does not match this instance's hostname "
                                 "{fqdn}".format(**locals()))
                 raise AirflowException("Hostname of job runner does not match")
             elif not same_process:
                 current_pid = os.getpid()
-                logging.warning("Recorded pid {ti.pid} does not match the 
current pid "
+                self.logger.warning("Recorded pid {ti.pid} does not match the 
current pid "
                                 "{current_pid}".format(**locals()))
                 raise AirflowException("PID of job runner does not match")
         elif (self.task_runner.return_code() is None
               and hasattr(self.task_runner, 'process')):
-            logging.warning(
-                "State of this instance has been externally set to "
-                "{}. Taking the poison pill. So long.".format(ti.state))
+            self.logger.warning(
+                "State of this instance has been externally set to %s. Taking 
the poison pill.",
+                ti.state
+            )
             self.task_runner.terminate()
             self.terminating = True

Reply via email to