http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/local_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index a58c3d7..9730737 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -20,14 +20,13 @@ from builtins import range from airflow import configuration from airflow.executors.base_executor import BaseExecutor +from airflow.utils.log.LoggingMixin import LoggingMixin from airflow.utils.state import State -from airflow.utils.logging import LoggingMixin PARALLELISM = configuration.get('core', 'PARALLELISM') class LocalWorker(multiprocessing.Process, LoggingMixin): - def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue @@ -41,15 +40,15 @@ class LocalWorker(multiprocessing.Process, LoggingMixin): # Received poison pill, no more tasks to run self.task_queue.task_done() break - self.logger.info("{} running {}".format( - self.__class__.__name__, command)) + self.logger.info("%s running %s", self.__class__.__name__, command) command = "exec bash -c '{0}'".format(command) try: subprocess.check_call(command, shell=True) state = State.SUCCESS except subprocess.CalledProcessError as e: state = State.FAILED - self.logger.error("failed to execute task {}:".format(str(e))) + self.logger.error("Failed to execute task %s.", str(e)) + # TODO: Why is this commented out? # raise e self.result_queue.put((key, state)) self.task_queue.task_done() @@ -68,7 +67,7 @@ class LocalExecutor(BaseExecutor): self.result_queue = multiprocessing.Queue() self.workers = [ LocalWorker(self.queue, self.result_queue) - for i in range(self.parallelism) + for _ in range(self.parallelism) ] for w in self.workers:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/sequential_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 43180cc..7d08a4b 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -37,14 +37,14 @@ class SequentialExecutor(BaseExecutor): def sync(self): for key, command in self.commands_to_run: - self.logger.info("Executing command: {}".format(command)) + self.logger.info("Executing command: %s", command) try: subprocess.check_call(command, shell=True) self.change_state(key, State.SUCCESS) except subprocess.CalledProcessError as e: self.change_state(key, State.FAILED) - self.logger.error("Failed to execute task {}:".format(str(e))) + self.logger.error("Failed to execute task %s.", str(e)) self.commands_to_run = [] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/S3_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index caaa575..2f7e6ee 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -13,9 +13,12 @@ # limitations under the License. from __future__ import division + from future import standard_library + +from airflow.utils.log.LoggingMixin import LoggingMixin + standard_library.install_aliases() -import logging import re import fnmatch import configparser @@ -27,8 +30,8 @@ import warnings import boto from boto.s3.connection import S3Connection, NoHostProvided from boto.sts import STSConnection + boto.set_stream_logger('boto') -logging.getLogger("boto").setLevel(logging.INFO) from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook @@ -84,7 +87,8 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None): if Config.has_option(cred_section, 'calling_format'): calling_format = Config.get(cred_section, 'calling_format') except: - logging.warning("Option Error in parsing s3 config file") + log = LoggingMixin().logger + log.warning("Option Error in parsing s3 config file") raise return (access_key, secret_key, calling_format) @@ -94,8 +98,8 @@ class S3Hook(BaseHook): Interact with S3. This class is a wrapper around the boto library. """ def __init__( - self, - s3_conn_id='s3_default'): + self, + s3_conn_id='s3_default'): self.s3_conn_id = s3_conn_id self.s3_conn = self.get_connection(s3_conn_id) self.extra_params = self.s3_conn.extra_dejson @@ -164,8 +168,8 @@ class S3Hook(BaseHook): a_key = s_key = None if self._creds_in_config_file: a_key, s_key, calling_format = _parse_s3_config(self.s3_config_file, - self.s3_config_format, - self.profile) + self.s3_config_format, + self.profile) elif self._creds_in_conn: a_key = self._a_key s_key = self._s_key @@ -185,14 +189,14 @@ class S3Hook(BaseHook): assumed_role_object = sts_connection.assume_role( role_arn=self.role_arn, role_session_name="Airflow_" + self.s3_conn_id - ) + ) creds = assumed_role_object.credentials connection = S3Connection( aws_access_key_id=creds.access_key, aws_secret_access_key=creds.secret_key, calling_format=calling_format, security_token=creds.session_token - ) + ) else: connection = S3Connection(aws_access_key_id=a_key, aws_secret_access_key=s_key, @@ -323,13 +327,13 @@ class S3Hook(BaseHook): return False if plist is None else prefix in plist def load_file( - self, - filename, - key, - bucket_name=None, - replace=False, - multipart_bytes=5 * (1024 ** 3), - encrypt=False): + self, + filename, + key, + bucket_name=None, + replace=False, + multipart_bytes=5 * (1024 ** 3), + encrypt=False): """ Loads a local file to S3 @@ -373,10 +377,8 @@ class S3Hook(BaseHook): for chunk in range(total_chunks): offset = chunk * multipart_bytes bytes = min(multipart_bytes, key_size - offset) - with FileChunkIO( - filename, 'r', offset=offset, bytes=bytes) as fp: - logging.info('Sending chunk {c} of {tc}...'.format( - c=chunk + 1, tc=total_chunks)) + with FileChunkIO(filename, 'r', offset=offset, bytes=bytes) as fp: + self.logger.info('Sending chunk %s of %s...', chunk + 1, total_chunks) mp.upload_part_from_file(fp, part_num=chunk + 1) except: mp.cancel_upload() @@ -389,8 +391,9 @@ class S3Hook(BaseHook): key_size = key_obj.set_contents_from_filename(filename, replace=replace, encrypt_key=encrypt) - logging.info("The key {key} now contains" - " {key_size} bytes".format(**locals())) + self.logger.info( + "The key {key} now contains {key_size} bytes".format(**locals()) + ) def load_string(self, string_data, key, bucket_name=None, @@ -429,5 +432,6 @@ class S3Hook(BaseHook): key_size = key_obj.set_contents_from_string(string_data, replace=replace, encrypt_key=encrypt) - logging.info("The key {key} now contains" - " {key_size} bytes".format(**locals())) + self.logger.info( + "The key {key} now contains {key_size} bytes".format(**locals()) + ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/base_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py index cef8c97..4617b98 100644 --- a/airflow/hooks/base_hook.py +++ b/airflow/hooks/base_hook.py @@ -17,19 +17,18 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals -from builtins import object -import logging import os import random from airflow import settings from airflow.models import Connection from airflow.exceptions import AirflowException +from airflow.utils.log.LoggingMixin import LoggingMixin CONN_ENV_PREFIX = 'AIRFLOW_CONN_' -class BaseHook(object): +class BaseHook(LoggingMixin): """ Abstract base class for hooks, hooks are meant as an interface to interact with external systems. MySqlHook, HiveHook, PigHook return @@ -40,6 +39,7 @@ class BaseHook(object): def __init__(self, source): pass + @classmethod def _get_connections_from_db(cls, conn_id): session = settings.Session() @@ -76,7 +76,8 @@ class BaseHook(object): def get_connection(cls, conn_id): conn = random.choice(cls.get_connections(conn_id)) if conn.host: - logging.info("Using connection to: " + conn.host) + log = LoggingMixin().logger + log.info("Using connection to: %s", conn.host) return conn @classmethod http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/dbapi_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index fe85153..85eebd0 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -16,8 +16,6 @@ from builtins import str from past.builtins import basestring from datetime import datetime from contextlib import closing -import numpy -import logging import sys from sqlalchemy import create_engine @@ -88,8 +86,8 @@ class DbApiHook(BaseHook): if sys.version_info[0] < 3: sql = sql.encode('utf-8') import pandas.io.sql as psql - - with closing(self.get_conn()) as conn: + + with closing(self.get_conn()) as conn: return psql.read_sql(sql, con=conn, params=parameters) def get_records(self, sql, parameters=None): @@ -104,7 +102,7 @@ class DbApiHook(BaseHook): """ if sys.version_info[0] < 3: sql = sql.encode('utf-8') - + with closing(self.get_conn()) as conn: with closing(conn.cursor()) as cur: if parameters is not None: @@ -125,7 +123,7 @@ class DbApiHook(BaseHook): """ if sys.version_info[0] < 3: sql = sql.encode('utf-8') - + with closing(self.get_conn()) as conn: with closing(conn.cursor()) as cur: if parameters is not None: @@ -151,21 +149,21 @@ class DbApiHook(BaseHook): """ if isinstance(sql, basestring): sql = [sql] - + with closing(self.get_conn()) as conn: if self.supports_autocommit: self.set_autocommit(conn, autocommit) - + with closing(conn.cursor()) as cur: for s in sql: if sys.version_info[0] < 3: s = s.encode('utf-8') - logging.info(s) + self.logger.info(s) if parameters is not None: cur.execute(s, parameters) else: cur.execute(s) - + conn.commit() def set_autocommit(self, conn, autocommit): @@ -197,13 +195,13 @@ class DbApiHook(BaseHook): target_fields = "({})".format(target_fields) else: target_fields = '' - + with closing(self.get_conn()) as conn: if self.supports_autocommit: self.set_autocommit(conn, False) - + conn.commit() - + with closing(conn.cursor()) as cur: for i, row in enumerate(rows, 1): l = [] @@ -218,11 +216,12 @@ class DbApiHook(BaseHook): cur.execute(sql, values) if commit_every and i % commit_every == 0: conn.commit() - logging.info( - "Loaded {i} into {table} rows so far".format(**locals())) - + self.logger.info( + "Loaded {i} into {table} rows so far".format(**locals()) + ) + conn.commit() - logging.info( + self.logger.info( "Done loading. Loaded a total of {i} rows".format(**locals())) @staticmethod http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 350c230..af3ae9b 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -14,7 +14,6 @@ from __future__ import print_function -import logging import requests import time @@ -33,7 +32,6 @@ class DruidHook(BaseHook): :param max_ingestion_time: The maximum ingestion time before assuming the job failed :type max_ingestion_time: int """ - def __init__( self, druid_ingest_conn_id='druid_ingest_default', @@ -70,12 +68,12 @@ class DruidHook(BaseHook): while running: req_status = requests.get("{0}/{1}/status".format(url, druid_task_id)) - logging.info("Job still running for {0} seconds...".format(sec)) + self.logger.info("Job still running for %s seconds...", sec) sec = sec + 1 if sec > self.max_ingestion_time: - raise AirflowException('Druid ingestion took more than {} seconds'.format(self.max_ingestion_time)) + raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time) time.sleep(self.timeout) @@ -87,6 +85,6 @@ class DruidHook(BaseHook): elif status == 'FAILED': raise AirflowException('Druid indexing job failed, check console for more info') else: - raise AirflowException('Could not get status of the job, got {0}'.format(status)) + raise AirflowException('Could not get status of the job, got %s', status) - logging.info('Successful index') + self.logger.info('Successful index') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index d120769..70d7642 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -17,10 +17,8 @@ from __future__ import print_function from builtins import zip from past.builtins import basestring -import collections import unicodecsv as csv import itertools -import logging import re import subprocess import time @@ -38,7 +36,6 @@ HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'] class HiveCliHook(BaseHook): - """Simple wrapper around the hive CLI. It also supports the ``beeline`` @@ -204,7 +201,7 @@ class HiveCliHook(BaseHook): hive_cmd.extend(['-f', f.name]) if verbose: - logging.info(" ".join(hive_cmd)) + self.logger.info(" ".join(hive_cmd)) sp = subprocess.Popen( hive_cmd, stdout=subprocess.PIPE, @@ -218,7 +215,7 @@ class HiveCliHook(BaseHook): break stdout += line.decode('UTF-8') if verbose: - logging.info(line.decode('UTF-8').strip()) + self.logger.info(line.decode('UTF-8').strip()) sp.wait() if sp.returncode: @@ -249,7 +246,7 @@ class HiveCliHook(BaseHook): for query in query_set: query_preview = ' '.join(query.split())[:50] - logging.info("Testing HQL [{0} (...)]".format(query_preview)) + self.logger.info("Testing HQL [%s (...)]", query_preview) if query_set == insert: query = other + '; explain ' + query else: @@ -258,16 +255,16 @@ class HiveCliHook(BaseHook): self.run_cli(query, verbose=False) except AirflowException as e: message = e.args[0].split('\n')[-2] - logging.info(message) + self.logger.info(message) error_loc = re.search('(\d+):(\d+)', message) if error_loc and error_loc.group(1).isdigit(): l = int(error_loc.group(1)) begin = max(l-2, 0) end = min(l+3, len(query.split('\n'))) context = '\n'.join(query.split('\n')[begin:end]) - logging.info("Context :\n {0}".format(context)) + self.logger.info("Context :\n %s", context) else: - logging.info("SUCCESS") + self.logger.info("SUCCESS") def load_df( self, @@ -356,7 +353,7 @@ class HiveCliHook(BaseHook): final destination using a ``HiveOperator``. :param filepath: local filepath of the file to load - :type filepath: str + :type filepath: str :param table: target Hive table, use dot notation to target a specific database :type table: str @@ -398,9 +395,9 @@ class HiveCliHook(BaseHook): tprops = ", ".join( ["'{0}'='{1}'".format(k, v) for k, v in tblproperties.items()]) hql += "TBLPROPERTIES({tprops})\n" - hql += ";" + hql += ";" hql = hql.format(**locals()) - logging.info(hql) + self.logger.info(hql) self.run_cli(hql) hql = "LOAD DATA LOCAL INPATH '{filepath}' " if overwrite: @@ -411,7 +408,7 @@ class HiveCliHook(BaseHook): ["{0}='{1}'".format(k, v) for k, v in partition.items()]) hql += "PARTITION ({pvals});" hql = hql.format(**locals()) - logging.info(hql) + self.logger.info(hql) self.run_cli(hql) def kill(self): @@ -665,8 +662,10 @@ class HiveServer2Hook(BaseHook): # impyla uses GSSAPI instead of KERBEROS as a auth_mechanism identifier if auth_mechanism == 'KERBEROS': - logging.warning("Detected deprecated 'KERBEROS' for authMechanism for %s. Please use 'GSSAPI' instead", - self.hiveserver2_conn_id) + self.logger.warning( + "Detected deprecated 'KERBEROS' for authMechanism for %s. Please use 'GSSAPI' instead", + self.hiveserver2_conn_id + ) auth_mechanism = 'GSSAPI' from impala.dbapi import connect @@ -697,7 +696,7 @@ class HiveServer2Hook(BaseHook): # may be `SET` or DDL records = cur.fetchall() except ProgrammingError: - logging.debug("get_results returned no records") + self.logger.debug("get_results returned no records") if records: results = { 'data': records, @@ -717,7 +716,7 @@ class HiveServer2Hook(BaseHook): schema = schema or 'default' with self.get_conn(schema) as conn: with conn.cursor() as cur: - logging.info("Running query: " + hql) + self.logger.info("Running query: %s", hql) cur.execute(hql) schema = cur.description with open(csv_filepath, 'wb') as f: @@ -735,8 +734,8 @@ class HiveServer2Hook(BaseHook): writer.writerows(rows) i += len(rows) - logging.info("Written {0} rows so far.".format(i)) - logging.info("Done. Loaded a total of {0} rows.".format(i)) + self.logger.info("Written %s rows so far.", i) + self.logger.info("Done. Loaded a total of %s rows.", i) def get_records(self, hql, schema='default'): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/http_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py index a144143..f168bc8 100644 --- a/airflow/hooks/http_hook.py +++ b/airflow/hooks/http_hook.py @@ -13,7 +13,6 @@ # limitations under the License. from builtins import str -import logging import requests @@ -83,7 +82,7 @@ class HttpHook(BaseHook): headers=headers) prepped_request = session.prepare_request(req) - logging.info("Sending '" + self.method + "' to url: " + url) + self.logger.info("Sending '%s' to url: %s", self.method, url) return self.run_and_check(session, prepped_request, extra_options) def run_and_check(self, session, prepped_request, extra_options): @@ -108,12 +107,12 @@ class HttpHook(BaseHook): # Tried rewrapping, but not supported. This way, it's possible # to get reason and code for failure by checking first 3 chars # for the code, or do a split on ':' - logging.error("HTTP error: " + response.reason) + self.logger.error("HTTP error: %s", response.reason) if self.method not in ('GET', 'HEAD'): # The sensor uses GET, so this prevents filling up the log # with the body every time the GET 'misses'. # That's ok to do, because GETs should be repeatable and # all data should be visible in the log (no post data) - logging.error(response.text) + self.logger.error(response.text) raise AirflowException(str(response.status_code)+":"+response.reason) return response http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/oracle_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py index 1386d6d..f439daa 100644 --- a/airflow/hooks/oracle_hook.py +++ b/airflow/hooks/oracle_hook.py @@ -19,7 +19,6 @@ from builtins import str from past.builtins import basestring from datetime import datetime import numpy -import logging class OracleHook(DbApiHook): @@ -102,11 +101,11 @@ class OracleHook(DbApiHook): cur.execute(sql) if i % commit_every == 0: conn.commit() - logging.info('Loaded {i} into {table} rows so far'.format(**locals())) + self.logger.info('Loaded {i} into {table} rows so far'.format(**locals())) conn.commit() cur.close() conn.close() - logging.info('Done loading. Loaded a total of {i} rows'.format(**locals())) + self.logger.info('Done loading. Loaded a total of {i} rows'.format(**locals())) def bulk_insert_rows(self, table, rows, target_fields=None, commit_every=5000): """A performant bulk insert for cx_Oracle that uses prepared statements via `executemany()`. @@ -130,13 +129,13 @@ class OracleHook(DbApiHook): cursor.prepare(prepared_stm) cursor.executemany(None, row_chunk) conn.commit() - logging.info('[%s] inserted %s rows', table, row_count) + self.logger.info('[%s] inserted %s rows', table, row_count) # Empty chunk row_chunk = [] # Commit the leftover chunk cursor.prepare(prepared_stm) cursor.executemany(None, row_chunk) conn.commit() - logging.info('[%s] inserted %s rows', table, row_count) + self.logger.info('[%s] inserted %s rows', table, row_count) cursor.close() conn.close() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/pig_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py index 7201b9f..29beb54 100644 --- a/airflow/hooks/pig_hook.py +++ b/airflow/hooks/pig_hook.py @@ -13,14 +13,12 @@ # limitations under the License. from __future__ import print_function -import logging import subprocess from tempfile import NamedTemporaryFile from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow.utils.file import TemporaryDirectory -from airflow import configuration class PigCliHook(BaseHook): @@ -64,7 +62,7 @@ class PigCliHook(BaseHook): pig_properties_list = self.pig_properties.split() pig_cmd.extend(pig_properties_list) if verbose: - logging.info(" ".join(pig_cmd)) + self.logger.info(" ".join(pig_cmd)) sp = subprocess.Popen( pig_cmd, stdout=subprocess.PIPE, @@ -75,7 +73,7 @@ class PigCliHook(BaseHook): for line in iter(sp.stdout.readline, ''): stdout += line if verbose: - logging.info(line.strip()) + self.logger.info(line.strip()) sp.wait() if sp.returncode: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/presto_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/presto_hook.py b/airflow/hooks/presto_hook.py index 768ff3f..b9f30a6 100644 --- a/airflow/hooks/presto_hook.py +++ b/airflow/hooks/presto_hook.py @@ -13,16 +13,12 @@ # limitations under the License. from builtins import str -import logging from pyhive import presto from pyhive.exc import DatabaseError from airflow.hooks.dbapi_hook import DbApiHook -logging.getLogger("pyhive").setLevel(logging.INFO) - - class PrestoException(Exception): pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/webhdfs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py index 5e2a28d..e7df328 100644 --- a/airflow/hooks/webhdfs_hook.py +++ b/airflow/hooks/webhdfs_hook.py @@ -14,16 +14,18 @@ from airflow.hooks.base_hook import BaseHook from airflow import configuration -import logging from hdfs import InsecureClient, HdfsError +from airflow.utils.log.LoggingMixin import LoggingMixin + _kerberos_security_mode = configuration.get("core", "security") == "kerberos" if _kerberos_security_mode: try: from hdfs.ext.kerberos import KerberosClient except ImportError: - logging.error("Could not load the Kerberos extension for the WebHDFSHook.") + log = LoggingMixin().logger + log.error("Could not load the Kerberos extension for the WebHDFSHook.") raise from airflow.exceptions import AirflowException @@ -47,7 +49,7 @@ class WebHDFSHook(BaseHook): nn_connections = self.get_connections(self.webhdfs_conn_id) for nn in nn_connections: try: - logging.debug('Trying namenode {}'.format(nn.host)) + self.logger.debug('Trying namenode %s', nn.host) connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn) if _kerberos_security_mode: client = KerberosClient(connection_str) @@ -55,11 +57,12 @@ class WebHDFSHook(BaseHook): proxy_user = self.proxy_user or nn.login client = InsecureClient(connection_str, user=proxy_user) client.status('/') - logging.debug('Using namenode {} for hook'.format(nn.host)) + self.logger.debug('Using namenode %s for hook', nn.host) return client except HdfsError as e: - logging.debug("Read operation on namenode {nn.host} failed with" - " error: {e.message}".format(**locals())) + self.logger.debug( + "Read operation on namenode {nn.host} failed witg error: {e.message}".format(**locals()) + ) nn_hosts = [c.host for c in nn_connections] no_nn_error = "Read operations failed on the namenodes below:\n{}".format("\n".join(nn_hosts)) raise AirflowWebHDFSHookException(no_nn_error) @@ -98,4 +101,4 @@ class WebHDFSHook(BaseHook): overwrite=overwrite, n_threads=parallelism, **kwargs) - logging.debug("Uploaded file {} to {}".format(source, destination)) + self.logger.debug("Uploaded file %s to %s", source, destination) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/hooks/zendesk_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py index 907d1e8..4634b22 100644 --- a/airflow/hooks/zendesk_hook.py +++ b/airflow/hooks/zendesk_hook.py @@ -12,19 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. # - - -""" -A hook to talk to Zendesk -""" - -import logging import time from zdesk import Zendesk, RateLimitError, ZendeskError from airflow.hooks.base_hook import BaseHook class ZendeskHook(BaseHook): + """ + A hook to talk to Zendesk + """ def __init__(self, zendesk_conn_id): self.__zendesk_conn_id = zendesk_conn_id self.__url = None @@ -41,10 +37,10 @@ class ZendeskHook(BaseHook): """ retry_after = int( rate_limit_exception.response.headers.get('Retry-After', 60)) - logging.info( - "Hit Zendesk API rate limit. Pausing for {} " - "seconds".format( - retry_after)) + self.logger.info( + "Hit Zendesk API rate limit. Pausing for %s seconds", + retry_after + ) time.sleep(retry_after) def call(self, path, query=None, get_all_pages=True): @@ -79,7 +75,7 @@ class ZendeskHook(BaseHook): # `github.zendesk...` # in it, but the call function needs it removed. next_url = next_page.split(self.__url)[1] - logging.info("Calling {}".format(next_url)) + self.logger.info("Calling %s", next_url) more_res = zendesk.call(next_url) results.extend(more_res[key]) if next_page == more_res['next_page']: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 138a055..904609c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -17,48 +17,43 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals -from past.builtins import basestring -from collections import defaultdict, Counter - -from datetime import datetime - import getpass -import logging -import socket import multiprocessing import os +import psutil import signal import six +import socket import sys import threading import time -from time import sleep - -import psutil +from collections import defaultdict +from datetime import datetime +from past.builtins import basestring from sqlalchemy import ( Column, Integer, String, DateTime, func, Index, or_, and_, not_) from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import make_transient from tabulate import tabulate +from time import sleep -from airflow import executors, models, settings from airflow import configuration as conf +from airflow import executors, models, settings from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun from airflow.settings import Stats from airflow.task_runner import get_task_runner from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS -from airflow.utils.state import State -from airflow.utils.db import provide_session, pessimistic_connection_handling +from airflow.utils import asciiart from airflow.utils.dag_processing import (AbstractDagFileProcessor, DagFileProcessorManager, SimpleDag, SimpleDagBag, list_py_file_paths) +from airflow.utils.db import provide_session, pessimistic_connection_handling from airflow.utils.email import send_email -from airflow.utils.logging import LoggingMixin -from airflow.utils import asciiart - +from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.state import State Base = models.Base ID_LEN = models.ID_LEN @@ -276,12 +271,14 @@ class BaseJob(Base, LoggingMixin): ["{}".format(x) for x in reset_tis]) session.commit() - self.logger.info("Reset the following {} TaskInstances:\n\t{}" - .format(len(reset_tis), task_instance_str)) + self.logger.info( + "Reset the following %s TaskInstances:\n\t%s", + len(reset_tis), task_instance_str + ) return reset_tis -class DagFileProcessor(AbstractDagFileProcessor): +class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): """Helps call SchedulerJob.process_file() in a separate process.""" # Counter that increments everytime an instance of this class is created @@ -361,6 +358,8 @@ class DagFileProcessor(AbstractDagFileProcessor): # responsive file tailing parent_dir, _ = os.path.split(log_file) + _log = LoggingMixin().logger + # Create the parent directory for the log file if necessary. if not os.path.isdir(parent_dir): os.makedirs(parent_dir) @@ -385,7 +384,7 @@ class DagFileProcessor(AbstractDagFileProcessor): threading.current_thread().name = thread_name start_time = time.time() - logging.info("Started process (PID=%s) to work on %s", + _log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path) scheduler_job = SchedulerJob(dag_ids=dag_id_white_list) @@ -393,12 +392,13 @@ class DagFileProcessor(AbstractDagFileProcessor): pickle_dags) result_queue.put(result) end_time = time.time() - logging.info("Processing %s took %.3f seconds", - file_path, - end_time - start_time) + _log.info( + "Processing %s took %.3f seconds", + file_path, end_time - start_time + ) except: # Log exceptions through the logging framework. - logging.exception("Got an exception! Propagating...") + _log.exception("Got an exception! Propagating...") raise finally: sys.stdout = original_stdout @@ -438,7 +438,7 @@ class DagFileProcessor(AbstractDagFileProcessor): # Arbitrarily wait 5s for the process to die self._process.join(5) if sigkill and self._process.is_alive(): - logging.warning("Killing PID %s", self._process.pid) + self.logger.warning("Killing PID %s", self._process.pid) os.kill(self._process.pid, signal.SIGKILL) @property @@ -478,7 +478,7 @@ class DagFileProcessor(AbstractDagFileProcessor): if not self._result_queue.empty(): self._result = self._result_queue.get_nowait() self._done = True - logging.debug("Waiting for %s", self._process) + self.logger.debug("Waiting for %s", self._process) self._process.join() return True @@ -488,7 +488,7 @@ class DagFileProcessor(AbstractDagFileProcessor): # Get the object from the queue or else join() can hang. if not self._result_queue.empty(): self._result = self._result_queue.get_nowait() - logging.debug("Waiting for %s", self._process) + self.logger.debug("Waiting for %s", self._process) self._process.join() return True @@ -610,8 +610,10 @@ class SchedulerJob(BaseJob): tasks that should have succeeded in the past hour. """ if not any([ti.sla for ti in dag.tasks]): - self.logger.info("Skipping SLA check for {} because " - "no tasks in DAG have SLAs".format(dag)) + self.logger.info( + "Skipping SLA check for %s because no tasks in DAG have SLAs", + dag + ) return TI = models.TaskInstance @@ -841,8 +843,10 @@ class SchedulerJob(BaseJob): task_start_dates = [t.start_date for t in dag.tasks] if task_start_dates: next_run_date = dag.normalize_schedule(min(task_start_dates)) - self.logger.debug("Next run date based on tasks {}" - .format(next_run_date)) + self.logger.debug( + "Next run date based on tasks %s", + next_run_date + ) else: next_run_date = dag.following_schedule(last_scheduled_run) @@ -859,8 +863,10 @@ class SchedulerJob(BaseJob): if next_run_date == dag.start_date: next_run_date = dag.normalize_schedule(dag.start_date) - self.logger.debug("Dag start date: {}. Next run date: {}" - .format(dag.start_date, next_run_date)) + self.logger.debug( + "Dag start date: %s. Next run date: %s", + dag.start_date, next_run_date + ) # don't ever schedule in the future if next_run_date > datetime.now(): @@ -908,11 +914,13 @@ class SchedulerJob(BaseJob): dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session) active_dag_runs = [] for run in dag_runs: - self.logger.info("Examining DAG run {}".format(run)) + self.logger.info("Examining DAG run %s", run) # don't consider runs that are executed in the future if run.execution_date > datetime.now(): - self.logger.error("Execution date is in future: {}" - .format(run.execution_date)) + self.logger.error( + "Execution date is in future: %s", + run.execution_date + ) continue if len(active_dag_runs) >= dag.max_active_runs: @@ -933,7 +941,7 @@ class SchedulerJob(BaseJob): active_dag_runs.append(run) for run in active_dag_runs: - self.logger.debug("Examining active DAG run {}".format(run)) + self.logger.debug("Examining active DAG run: %s", run) # this needs a fresh session sometimes tis get detached tis = run.get_task_instances(state=(State.NONE, State.UP_FOR_RETRY)) @@ -954,7 +962,7 @@ class SchedulerJob(BaseJob): if ti.are_dependencies_met( dep_context=DepContext(flag_upstream_failed=True), session=session): - self.logger.debug('Queuing task: {}'.format(ti)) + self.logger.debug('Queuing task: %s', ti) queue.append(ti.key) session.close() @@ -1012,9 +1020,10 @@ class SchedulerJob(BaseJob): session.commit() if tis_changed > 0: - self.logger.warning("Set {} task instances to state={} as their associated " - "DagRun was not in RUNNING state".format( - tis_changed, new_state)) + self.logger.warning( + "Set %s task instances to state=%s as their associated DagRun was not in RUNNING state", + tis_changed, new_state + ) @provide_session def _find_executable_task_instances(self, simple_dag_bag, states, session=None): @@ -1066,7 +1075,7 @@ class SchedulerJob(BaseJob): # Put one task instance on each line task_instance_str = "\n\t".join( ["{}".format(x) for x in task_instances_to_examine]) - self.logger.info("Tasks up for execution:\n\t{}".format(task_instance_str)) + self.logger.info("Tasks up for execution:\n\t%s", task_instance_str) # Get the pool settings pools = {p.pool: p for p in session.query(models.Pool).all()} @@ -1087,9 +1096,12 @@ class SchedulerJob(BaseJob): open_slots = pools[pool].open_slots(session=session) num_queued = len(task_instances) - self.logger.info("Figuring out tasks to run in Pool(name={pool}) " - "with {open_slots} open slots and {num_queued} " - "task instances in queue".format(**locals())) + self.logger.info( + "Figuring out tasks to run in Pool(name={pool}) with {open_slots} " + "open slots and {num_queued} task instances in queue".format( + **locals() + ) + ) priority_sorted_task_instances = sorted( task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)) @@ -1099,8 +1111,10 @@ class SchedulerJob(BaseJob): for task_instance in priority_sorted_task_instances: if open_slots <= 0: - self.logger.info("Not scheduling since there are {} open slots in pool {}" - .format(open_slots, pool)) + self.logger.info( + "Not scheduling since there are %s open slots in pool %s", + open_slots, pool + ) # Can't schedule any more since there are no more open slots. break @@ -1119,25 +1133,23 @@ class SchedulerJob(BaseJob): current_task_concurrency = dag_id_to_possibly_running_task_count[dag_id] task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency - self.logger.info("DAG {} has {}/{} running and queued tasks" - .format(dag_id, - current_task_concurrency, - task_concurrency_limit)) + self.logger.info( + "DAG %s has %s/%s running and queued tasks", + dag_id, current_task_concurrency, task_concurrency_limit + ) if current_task_concurrency >= task_concurrency_limit: - self.logger.info("Not executing {} since the number " - "of tasks running or queued from DAG {}" - " is >= to the " - "DAG's task concurrency limit of {}" - .format(task_instance, - dag_id, - task_concurrency_limit)) + self.logger.info( + "Not executing %s since the number of tasks running or queued from DAG %s" + " is >= to the DAG's task concurrency limit of %s", + task_instance, dag_id, task_concurrency_limit + ) continue - if self.executor.has_task(task_instance): - self.logger.debug(("Not handling task {} as the executor " + - "reports it is running") - .format(task_instance.key)) + self.logger.debug( + "Not handling task %s as the executor reports it is running", + task_instance.key + ) continue executable_tis.append(task_instance) open_slots -= 1 @@ -1145,8 +1157,7 @@ class SchedulerJob(BaseJob): task_instance_str = "\n\t".join( ["{}".format(x) for x in executable_tis]) - self.logger.info("Setting the follow tasks to queued state:\n\t{}" - .format(task_instance_str)) + self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str) # so these dont expire on commit for ti in executable_tis: copy_dag_id = ti.dag_id @@ -1225,8 +1236,7 @@ class SchedulerJob(BaseJob): task_instance_str = "\n\t".join( ["{}".format(x) for x in tis_to_be_queued]) - self.logger.info("Setting the follow tasks to queued state:\n\t{}" - .format(task_instance_str)) + self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str) return tis_to_be_queued def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances): @@ -1258,8 +1268,10 @@ class SchedulerJob(BaseJob): priority = task_instance.priority_weight queue = task_instance.queue - self.logger.info("Sending {} to executor with priority {} and queue {}" - .format(task_instance.key, priority, queue)) + self.logger.info( + "Sending %s to executor with priority %s and queue %s", + task_instance.key, priority, queue + ) # save attributes so sqlalchemy doesnt expire them copy_dag_id = task_instance.dag_id @@ -1345,20 +1357,18 @@ class SchedulerJob(BaseJob): for dag in dags: dag = dagbag.get_dag(dag.dag_id) if dag.is_paused: - self.logger.info("Not processing DAG {} since it's paused" - .format(dag.dag_id)) + self.logger.info("Not processing DAG %s since it's paused", dag.dag_id) continue if not dag: - self.logger.error("DAG ID {} was not found in the DagBag" - .format(dag.dag_id)) + self.logger.error("DAG ID %s was not found in the DagBag", dag.dag_id) continue - self.logger.info("Processing {}".format(dag.dag_id)) + self.logger.info("Processing %s", dag.dag_id) dag_run = self.create_dag_run(dag) if dag_run: - self.logger.info("Created {}".format(dag_run)) + self.logger.info("Created %s", dag_run) self._process_task_instances(dag, tis_out) self.manage_slas(dag) @@ -1374,11 +1384,10 @@ class SchedulerJob(BaseJob): """ for key, executor_state in list(self.executor.get_event_buffer().items()): dag_id, task_id, execution_date = key - self.logger.info("Executor reports {}.{} execution_date={} as {}" - .format(dag_id, - task_id, - execution_date, - executor_state)) + self.logger.info( + "Executor reports %s.%s execution_date=%s as %s", + dag_id, task_id, execution_date, executor_state + ) def _log_file_processing_stats(self, known_file_paths, @@ -1450,8 +1459,6 @@ class SchedulerJob(BaseJob): self.logger.info("Starting the scheduler") pessimistic_connection_handling() - logging.basicConfig(level=logging.DEBUG) - # DAGs can be pickled for easier remote execution by some executors pickle_dags = False if self.do_pickle and self.executor.__class__ not in \ @@ -1462,22 +1469,16 @@ class SchedulerJob(BaseJob): # DAGs in parallel. By processing them in separate processes, # we can get parallelism and isolation from potentially harmful # user code. - self.logger.info("Processing files using up to {} processes at a time " - .format(self.max_threads)) - self.logger.info("Running execute loop for {} seconds" - .format(self.run_duration)) - self.logger.info("Processing each file at most {} times" - .format(self.num_runs)) - self.logger.info("Process each file at most once every {} seconds" - .format(self.file_process_interval)) - self.logger.info("Checking for new files in {} every {} seconds" - .format(self.subdir, self.dag_dir_list_interval)) + self.logger.info("Processing files using up to %s processes at a time", self.max_threads) + self.logger.info("Running execute loop for %s seconds", self.run_duration) + self.logger.info("Processing each file at most %s times", self.num_runs) + self.logger.info("Process each file at most once every %s seconds", self.file_process_interval) + self.logger.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval) # Build up a list of Python files that could contain DAGs - self.logger.info("Searching for files in {}".format(self.subdir)) + self.logger.info("Searching for files in %s", self.subdir) known_file_paths = list_py_file_paths(self.subdir) - self.logger.info("There are {} files in {}" - .format(len(known_file_paths), self.subdir)) + self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir) def processor_factory(file_path, log_file_path): return DagFileProcessor(file_path, @@ -1510,23 +1511,22 @@ class SchedulerJob(BaseJob): child_processes = [x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill] for child in child_processes: - self.logger.info("Terminating child PID: {}".format(child.pid)) + self.logger.info("Terminating child PID: %s", child.pid) child.terminate() + # TODO: Remove magic number timeout = 5 - self.logger.info("Waiting up to {}s for processes to exit..." - .format(timeout)) + self.logger.info("Waiting up to %s seconds for processes to exit...", timeout) try: psutil.wait_procs(child_processes, timeout) except psutil.TimeoutExpired: - self.logger.debug("Ran out of time while waiting for " - "processes to exit") + self.logger.debug("Ran out of time while waiting for processes to exit") # Then SIGKILL child_processes = [x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill] if len(child_processes) > 0: for child in child_processes: - self.logger.info("Killing child PID: {}".format(child.pid)) + self.logger.info("Killing child PID: %s", child.pid) child.kill() child.wait() @@ -1568,11 +1568,10 @@ class SchedulerJob(BaseJob): if elapsed_time_since_refresh > self.dag_dir_list_interval: # Build up a list of Python files that could contain DAGs - self.logger.info("Searching for files in {}".format(self.subdir)) + self.logger.info("Searching for files in %s", self.subdir) known_file_paths = list_py_file_paths(self.subdir) last_dag_dir_refresh_time = datetime.now() - self.logger.info("There are {} files in {}" - .format(len(known_file_paths), self.subdir)) + self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir) processor_manager.set_file_paths(known_file_paths) self.logger.debug("Removing old import errors") @@ -1585,8 +1584,7 @@ class SchedulerJob(BaseJob): if self.using_sqlite: # For the sqlite case w/ 1 thread, wait until the processor # is finished to avoid concurrent access to the DB. - self.logger.debug("Waiting for processors to finish since we're " - "using sqlite") + self.logger.debug("Waiting for processors to finish since we're using sqlite") processor_manager.wait_until_finished() # Send tasks for execution if available @@ -1638,16 +1636,13 @@ class SchedulerJob(BaseJob): last_stat_print_time = datetime.now() loop_end_time = time.time() - self.logger.debug("Ran scheduling loop in {:.2f}s" - .format(loop_end_time - loop_start_time)) - self.logger.debug("Sleeping for {:.2f}s" - .format(self._processor_poll_interval)) + self.logger.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time) + self.logger.debug("Sleeping for %.2f seconds", self._processor_poll_interval) time.sleep(self._processor_poll_interval) # Exit early for a test mode if processor_manager.max_runs_reached(): - self.logger.info("Exiting loop as all files have been processed " - "{} times".format(self.num_runs)) + self.logger.info("Exiting loop as all files have been processed %s times", self.num_runs) break # Stop any processors @@ -1662,8 +1657,10 @@ class SchedulerJob(BaseJob): all_files_processed = False break if all_files_processed: - self.logger.info("Deactivating DAGs that haven't been touched since {}" - .format(execute_start_time.isoformat())) + self.logger.info( + "Deactivating DAGs that haven't been touched since %s", + execute_start_time.isoformat() + ) models.DAG.deactivate_stale_dags(execute_start_time) self.executor.end() @@ -1696,23 +1693,21 @@ class SchedulerJob(BaseJob): :return: a list of SimpleDags made from the Dags found in the file :rtype: list[SimpleDag] """ - self.logger.info("Processing file {} for tasks to queue".format(file_path)) + self.logger.info("Processing file %s for tasks to queue", file_path) # As DAGs are parsed from this file, they will be converted into SimpleDags simple_dags = [] try: dagbag = models.DagBag(file_path) except Exception: - self.logger.exception("Failed at reloading the DAG file {}".format(file_path)) + self.logger.exception("Failed at reloading the DAG file %s", file_path) Stats.incr('dag_file_refresh_error', 1, 1) return [] if len(dagbag.dags) > 0: - self.logger.info("DAG(s) {} retrieved from {}" - .format(dagbag.dags.keys(), - file_path)) + self.logger.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path) else: - self.logger.warning("No viable dags retrieved from {}".format(file_path)) + self.logger.warning("No viable dags retrieved from %s", file_path) self.update_import_errors(session, dagbag) return [] @@ -1783,7 +1778,7 @@ class SchedulerJob(BaseJob): ti.state = State.SCHEDULED # Also save this task instance to the DB. - self.logger.info("Creating / updating {} in ORM".format(ti)) + self.logger.info("Creating / updating %s in ORM", ti) session.merge(ti) session.commit() @@ -1914,25 +1909,22 @@ class BackfillJob(BaseJob): ti.refresh_from_db() if ti.state == State.SUCCESS: ti_status.succeeded.add(key) - self.logger.debug("Task instance {} succeeded. " - "Don't rerun.".format(ti)) + self.logger.debug("Task instance %s succeeded. Don't rerun.", ti) ti_status.started.pop(key) continue elif ti.state == State.SKIPPED: ti_status.skipped.add(key) - self.logger.debug("Task instance {} skipped. " - "Don't rerun.".format(ti)) + self.logger.debug("Task instance %s skipped. Don't rerun.", ti) ti_status.started.pop(key) continue elif ti.state == State.FAILED: - self.logger.error("Task instance {} failed".format(ti)) + self.logger.error("Task instance %s failed", ti) ti_status.failed.add(key) ti_status.started.pop(key) continue # special case: if the task needs to run again put it back elif ti.state == State.UP_FOR_RETRY: - self.logger.warning("Task instance {} is up for retry" - .format(ti)) + self.logger.warning("Task instance %s is up for retry", ti) ti_status.started.pop(key) ti_status.to_run[key] = ti # special case: The state of the task can be set to NONE by the task itself @@ -1941,9 +1933,11 @@ class BackfillJob(BaseJob): # for that as otherwise those tasks would fall outside of the scope of # the backfill suddenly. elif ti.state == State.NONE: - self.logger.warning("FIXME: task instance {} state was set to " - "None externally or reaching concurrency limits. " - "Re-adding task to queue.".format(ti)) + self.logger.warning( + "FIXME: task instance %s state was set to none externally or " + "reaching concurrency limits. Re-adding task to queue.", + ti + ) session = settings.Session() ti.set_state(State.SCHEDULED, session=session) session.close() @@ -1960,14 +1954,16 @@ class BackfillJob(BaseJob): for key, state in list(executor.get_event_buffer().items()): if key not in started: - self.logger.warning("{} state {} not in started={}" - .format(key, state, started.values())) + self.logger.warning( + "%s state %s not in started=%s", + key, state, started.values() + ) continue ti = started[key] ti.refresh_from_db() - self.logger.debug("Executor state: {} task {}".format(state, ti)) + self.logger.debug("Executor state: %s task %s", state, ti) if state == State.FAILED or state == State.SUCCESS: if ti.state == State.RUNNING or ti.state == State.QUEUED: @@ -2090,9 +2086,10 @@ class BackfillJob(BaseJob): len(ti_status.not_ready)) self.logger.info(msg) - self.logger.debug("Finished dag run loop iteration. " - "Remaining tasks {}" - .format(ti_status.to_run.values())) + self.logger.debug( + "Finished dag run loop iteration. Remaining tasks %s", + ti_status.to_run.values() + ) @provide_session def _process_backfill_task_instances(self, @@ -2142,43 +2139,41 @@ class BackfillJob(BaseJob): ignore_depends_on_past = ( self.ignore_first_depends_on_past and ti.execution_date == (start_date or ti.start_date)) - self.logger.debug("Task instance to run {} state {}" - .format(ti, ti.state)) + self.logger.debug("Task instance to run %s state %s", ti, ti.state) # guard against externally modified tasks instances or # in case max concurrency has been reached at task runtime if ti.state == State.NONE: - self.logger.warning("FIXME: task instance {} state was set to " - "None externally. This should not happen") + self.logger.warning( + "FIXME: task instance {} state was set to None externally. This should not happen" + ) ti.set_state(State.SCHEDULED, session=session) # The task was already marked successful or skipped by a # different Job. Don't rerun it. if ti.state == State.SUCCESS: ti_status.succeeded.add(key) - self.logger.debug("Task instance {} succeeded. " - "Don't rerun.".format(ti)) + self.logger.debug("Task instance %s succeeded. Don't rerun.", ti) ti_status.to_run.pop(key) if key in ti_status.started: ti_status.started.pop(key) continue elif ti.state == State.SKIPPED: ti_status.skipped.add(key) - self.logger.debug("Task instance {} skipped. " - "Don't rerun.".format(ti)) + self.logger.debug("Task instance %s skipped. Don't rerun.", ti) ti_status.to_run.pop(key) if key in ti_status.started: ti_status.started.pop(key) continue elif ti.state == State.FAILED: - self.logger.error("Task instance {} failed".format(ti)) + self.logger.error("Task instance %s failed", ti) ti_status.failed.add(key) ti_status.to_run.pop(key) if key in ti_status.started: ti_status.started.pop(key) continue elif ti.state == State.UPSTREAM_FAILED: - self.logger.error("Task instance {} upstream failed".format(ti)) + self.logger.error("Task instance %s upstream failed", ti) ti_status.failed.add(key) ti_status.to_run.pop(key) if key in ti_status.started: @@ -2200,10 +2195,12 @@ class BackfillJob(BaseJob): ti.refresh_from_db(lock_for_update=True, session=session) if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY: if executor.has_task(ti): - self.logger.debug("Task Instance {} already in executor " - "waiting for queue to clear".format(ti)) + self.logger.debug( + "Task Instance %s already in executor waiting for queue to clear", + ti + ) else: - self.logger.debug('Sending {} to executor'.format(ti)) + self.logger.debug('Sending %s to executor', ti) # Skip scheduled state, we are executing immediately ti.state = State.QUEUED session.merge(ti) @@ -2220,7 +2217,7 @@ class BackfillJob(BaseJob): continue if ti.state == State.UPSTREAM_FAILED: - self.logger.error("Task instance {} upstream failed".format(ti)) + self.logger.error("Task instance %s upstream failed", ti) ti_status.failed.add(key) ti_status.to_run.pop(key) if key in ti_status.started: @@ -2229,15 +2226,14 @@ class BackfillJob(BaseJob): # special case if ti.state == State.UP_FOR_RETRY: - self.logger.debug("Task instance {} retry period not expired yet" - .format(ti)) + self.logger.debug("Task instance %s retry period not expired yet", ti) if key in ti_status.started: ti_status.started.pop(key) ti_status.to_run[key] = ti continue # all remaining tasks - self.logger.debug('Adding {} to not_ready'.format(ti)) + self.logger.debug('Adding %s to not_ready', ti) ti_status.not_ready.add(key) # execute the tasks in the queue @@ -2250,8 +2246,10 @@ class BackfillJob(BaseJob): if (ti_status.not_ready and ti_status.not_ready == set(ti_status.to_run) and len(ti_status.started) == 0): - self.logger.warning("Deadlock discovered for ti_status.to_run={}" - .format(ti_status.to_run.values())) + self.logger.warning( + "Deadlock discovered for ti_status.to_run=%s", + ti_status.to_run.values() + ) ti_status.deadlocked.update(ti_status.to_run.values()) ti_status.to_run.clear() @@ -2284,7 +2282,7 @@ class BackfillJob(BaseJob): if ti_status.failed: err += ( "---------------------------------------------------\n" - "Some task instances failed:\n{}\n".format(ti_status.failed)) + "Some task instances failed:\n%s\n".format(ti_status.failed)) if ti_status.deadlocked: err += ( '---------------------------------------------------\n' @@ -2367,8 +2365,7 @@ class BackfillJob(BaseJob): run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date) if len(run_dates) == 0: - self.logger.info("No run dates were found for the given dates and dag " - "interval.") + self.logger.info("No run dates were found for the given dates and dag interval.") return # picklin' @@ -2406,9 +2403,11 @@ class BackfillJob(BaseJob): raise AirflowException(err) if remaining_dates > 0: - self.logger.info(("max_active_runs limit for dag {} has been reached " - " - waiting for other dag runs to finish") - .format(self.dag_id)) + self.logger.info( + "max_active_runs limit for dag %s has been reached " + " - waiting for other dag runs to finish", + self.dag_id + ) time.sleep(self.delay_on_limit_secs) finally: executor.end() @@ -2454,8 +2453,8 @@ class LocalTaskJob(BaseJob): self.task_runner = get_task_runner(self) def signal_handler(signum, frame): - '''Setting kill signal handler''' - logging.error("Killing subprocess") + """Setting kill signal handler""" + self.logger.error("Killing subprocess") self.on_kill() raise AirflowException("LocalTaskJob received SIGTERM signal") signal.signal(signal.SIGTERM, signal_handler) @@ -2468,8 +2467,8 @@ class LocalTaskJob(BaseJob): ignore_ti_state=self.ignore_ti_state, job_id=self.id, pool=self.pool): - self.logger.info("Task is not able to be run") - return + self.logger.info("Task is not able to be run") + return try: self.task_runner.start() @@ -2481,8 +2480,7 @@ class LocalTaskJob(BaseJob): # Monitor the task to see if it's done return_code = self.task_runner.return_code() if return_code is not None: - self.logger.info("Task exited with return code {}" - .format(return_code)) + self.logger.info("Task exited with return code %s", return_code) return # Periodically heartbeat so that the scheduler doesn't think this @@ -2492,8 +2490,10 @@ class LocalTaskJob(BaseJob): last_heartbeat_time = time.time() except OperationalError: Stats.incr('local_task_job_heartbeat_failure', 1, 1) - self.logger.exception("Exception while trying to heartbeat! " - "Sleeping for {}s".format(self.heartrate)) + self.logger.exception( + "Exception while trying to heartbeat! Sleeping for %s seconds", + self.heartrate + ) time.sleep(self.heartrate) # If it's been too long since we've heartbeat, then it's possible that @@ -2531,19 +2531,20 @@ class LocalTaskJob(BaseJob): if ti.state == State.RUNNING: if not same_hostname: - logging.warning("The recorded hostname {ti.hostname} " + self.logger.warning("The recorded hostname {ti.hostname} " "does not match this instance's hostname " "{fqdn}".format(**locals())) raise AirflowException("Hostname of job runner does not match") elif not same_process: current_pid = os.getpid() - logging.warning("Recorded pid {ti.pid} does not match the current pid " + self.logger.warning("Recorded pid {ti.pid} does not match the current pid " "{current_pid}".format(**locals())) raise AirflowException("PID of job runner does not match") elif (self.task_runner.return_code() is None and hasattr(self.task_runner, 'process')): - logging.warning( - "State of this instance has been externally set to " - "{}. Taking the poison pill. So long.".format(ti.state)) + self.logger.warning( + "State of this instance has been externally set to %s. Taking the poison pill.", + ti.state + ) self.task_runner.terminate() self.terminating = True
