http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 3078f4e..39e65e8 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -35,7 +35,6 @@ import inspect import zipfile import jinja2 import json -import logging import os import pickle import re @@ -75,11 +74,11 @@ from airflow.utils.decorators import apply_defaults from airflow.utils.email import send_email from airflow.utils.helpers import ( as_tuple, is_container, is_in, validate_key, pprinttable) -from airflow.utils.logging import LoggingMixin from airflow.utils.operator_resources import Resources from airflow.utils.state import State from airflow.utils.timeout import timeout from airflow.utils.trigger_rule import TriggerRule +from airflow.utils.log.LoggingMixin import LoggingMixin Base = declarative_base() ID_LEN = 250 @@ -87,7 +86,6 @@ XCOM_RETURN_KEY = 'return_value' Stats = settings.Stats - def get_fernet(): """ Deferred load of Fernet key. @@ -180,6 +178,7 @@ class DagBag(BaseDagBag, LoggingMixin): by the scheduler job only :type sync_to_db: bool """ + def __init__( self, dag_folder=None, @@ -190,7 +189,7 @@ class DagBag(BaseDagBag, LoggingMixin): if executor is None: executor = GetDefaultExecutor() dag_folder = dag_folder or settings.DAGS_FOLDER - self.logger.info("Filling up the DagBag from {}".format(dag_folder)) + self.logger.info("Filling up the DagBag from %s", dag_folder) self.dag_folder = dag_folder self.dags = {} # the file's last modified timestamp when we last read it @@ -263,7 +262,7 @@ class DagBag(BaseDagBag, LoggingMixin): return found_dags except Exception as e: - logging.exception(e) + self.logger.exception(e) return found_dags mods = [] @@ -275,7 +274,7 @@ class DagBag(BaseDagBag, LoggingMixin): self.file_last_changed[filepath] = file_last_changed_on_disk return found_dags - self.logger.debug("Importing {}".format(filepath)) + self.logger.debug("Importing %s", filepath) org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1]) mod_name = ('unusual_prefix_' + hashlib.sha1(filepath.encode('utf-8')).hexdigest() + @@ -289,7 +288,7 @@ class DagBag(BaseDagBag, LoggingMixin): m = imp.load_source(mod_name, filepath) mods.append(m) except Exception as e: - self.logger.exception("Failed to import: " + filepath) + self.logger.exception("Failed to import: %s", filepath) self.import_errors[filepath] = str(e) self.file_last_changed[filepath] = file_last_changed_on_disk @@ -300,13 +299,10 @@ class DagBag(BaseDagBag, LoggingMixin): mod_name, ext = os.path.splitext(mod.filename) if not head and (ext == '.py' or ext == '.pyc'): if mod_name == '__init__': - self.logger.warning("Found __init__.{0} at root of {1}". - format(ext, filepath)) - + self.logger.warning("Found __init__.%s at root of %s", ext, filepath) if safe_mode: with zip_file.open(mod.filename) as zf: - self.logger.debug("Reading {} from {}". - format(mod.filename, filepath)) + self.logger.debug("Reading %s from %s", mod.filename, filepath) content = zf.read() if not all([s in content for s in (b'DAG', b'airflow')]): self.file_last_changed[filepath] = ( @@ -322,7 +318,7 @@ class DagBag(BaseDagBag, LoggingMixin): m = importlib.import_module(mod_name) mods.append(m) except Exception as e: - self.logger.exception("Failed to import: " + filepath) + self.logger.exception("Failed to import: %s", filepath) self.import_errors[filepath] = str(e) self.file_last_changed[filepath] = file_last_changed_on_disk @@ -347,11 +343,9 @@ class DagBag(BaseDagBag, LoggingMixin): from airflow.jobs import LocalTaskJob as LJ self.logger.info("Finding 'running' jobs without a recent heartbeat") TI = TaskInstance - secs = ( - configuration.getint('scheduler', 'scheduler_zombie_task_threshold')) + secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold') limit_dttm = datetime.now() - timedelta(seconds=secs) - self.logger.info( - "Failing jobs without heartbeat after {}".format(limit_dttm)) + self.logger.info("Failing jobs without heartbeat after %s", limit_dttm) tis = ( session.query(TI) @@ -371,9 +365,8 @@ class DagBag(BaseDagBag, LoggingMixin): if ti.task_id in dag.task_ids: task = dag.get_task(ti.task_id) ti.task = task - ti.handle_failure("{} killed as zombie".format(ti)) - self.logger.info( - 'Marked zombie job {} as failed'.format(ti)) + ti.handle_failure("{} killed as zombie".format(str(ti))) + self.logger.info('Marked zombie job %s as failed', ti) Stats.incr('zombies_killed') session.commit() @@ -451,7 +444,7 @@ class DagBag(BaseDagBag, LoggingMixin): str([dag.dag_id for dag in found_dags]), )) except Exception as e: - logging.warning(e) + self.logger.warning(e) Stats.gauge( 'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1) Stats.gauge( @@ -619,7 +612,7 @@ class Connection(Base): self.is_encrypted = True except AirflowException: self.logger.exception("Failed to load fernet while encrypting value, " - "using non-encrypted value.") + "using non-encrypted value.") self._password = value self.is_encrypted = False @@ -648,7 +641,7 @@ class Connection(Base): self.is_extra_encrypted = True except AirflowException: self.logger.exception("Failed to load fernet while encrypting value, " - "using non-encrypted value.") + "using non-encrypted value.") self._extra = value self.is_extra_encrypted = False @@ -718,8 +711,8 @@ class Connection(Base): try: obj = json.loads(self.extra) except Exception as e: - logging.exception(e) - logging.error("Failed parsing the json for conn_id %s", self.conn_id) + self.logger.exception(e) + self.logger.error("Failed parsing the json for conn_id %s", self.conn_id) return obj @@ -750,7 +743,7 @@ class DagPickle(Base): self.pickle = dag -class TaskInstance(Base): +class TaskInstance(Base, LoggingMixin): """ Task instances store the state of a task instance. This table is the authority and single source of truth around what tasks have run and the @@ -764,7 +757,6 @@ class TaskInstance(Base): even while multiple schedulers may be firing task instances. """ - __tablename__ = "task_instance" task_id = Column(String(ID_LEN), primary_key=True) @@ -1014,7 +1006,7 @@ class TaskInstance(Base): """ Forces the task instance's state to FAILED in the database. """ - logging.error("Recording the task instance as FAILED") + self.logger.error("Recording the task instance as FAILED") self.state = State.FAILED session.merge(self) session.commit() @@ -1165,14 +1157,16 @@ class TaskInstance(Base): session=session): failed = True if verbose: - logging.info("Dependencies not met for {}, dependency '{}' FAILED: {}" - .format(self, dep_status.dep_name, dep_status.reason)) + self.logger.info( + "Dependencies not met for %s, dependency '%s' FAILED: %s", + self, dep_status.dep_name, dep_status.reason + ) if failed: return False if verbose: - logging.info("Dependencies all met for {}".format(self)) + self.logger.info("Dependencies all met for %s", self) return True @@ -1188,11 +1182,10 @@ class TaskInstance(Base): session, dep_context): - logging.debug("{} dependency '{}' PASSED: {}, {}" - .format(self, - dep_status.dep_name, - dep_status.passed, - dep_status.reason)) + self.logger.debug( + "%s dependency '%s' PASSED: %s, %s", + self, dep_status.dep_name, dep_status.passed, dep_status.reason + ) if not dep_status.passed: yield dep_status @@ -1335,6 +1328,7 @@ class TaskInstance(Base): session.commit() return False + #TODO: Logging needs cleanup, not clear what is being printed hr = "\n" + ("-" * 80) + "\n" # Line break # For reporting purposes, we report based on 1-indexed, @@ -1365,11 +1359,10 @@ class TaskInstance(Base): "runtime. Attempt {attempt} of {total}. State set to NONE.").format( attempt=self.try_number + 1, total=self.max_tries + 1) - logging.warning(hr + msg + hr) + self.logger.warning(hr + msg + hr) self.queued_dttm = datetime.now() - msg = "Queuing into pool {}".format(self.pool) - logging.info(msg) + self.logger.info("Queuing into pool %s", self.pool) session.merge(self) session.commit() return False @@ -1378,12 +1371,12 @@ class TaskInstance(Base): # the current worker process was blocked on refresh_from_db if self.state == State.RUNNING: msg = "Task Instance already running {}".format(self) - logging.warning(msg) + self.logger.warning(msg) session.commit() return False # print status message - logging.info(hr + msg + hr) + self.logger.info(hr + msg + hr) self.try_number += 1 if not test_mode: @@ -1401,10 +1394,10 @@ class TaskInstance(Base): if verbose: if mark_success: msg = "Marking success for {} on {}".format(self.task, self.execution_date) - logging.info(msg) + self.logger.info(msg) else: msg = "Executing {} on {}".format(self.task, self.execution_date) - logging.info(msg) + self.logger.info(msg) return True @provide_session @@ -1445,8 +1438,8 @@ class TaskInstance(Base): self.task = task_copy def signal_handler(signum, frame): - '''Setting kill signal handler''' - logging.error("Killing subprocess") + """Setting kill signal handler""" + self.logger.error("Killing subprocess") task_copy.on_kill() raise AirflowException("Task received SIGTERM signal") signal.signal(signal.SIGTERM, signal_handler) @@ -1525,8 +1518,8 @@ class TaskInstance(Base): if task.on_success_callback: task.on_success_callback(context) except Exception as e3: - logging.error("Failed when executing success callback") - logging.exception(e3) + self.logger.error("Failed when executing success callback") + self.logger.exception(e3) session.commit() @@ -1571,7 +1564,7 @@ class TaskInstance(Base): task_copy.dry_run() def handle_failure(self, error, test_mode=False, context=None): - logging.exception(error) + self.logger.exception(error) task = self.task session = settings.Session() self.end_date = datetime.now() @@ -1592,21 +1585,20 @@ class TaskInstance(Base): # next task instance try_number exceeds the max_tries. if task.retries and self.try_number <= self.max_tries: self.state = State.UP_FOR_RETRY - logging.info('Marking task as UP_FOR_RETRY') + self.logger.info('Marking task as UP_FOR_RETRY') if task.email_on_retry and task.email: self.email_alert(error, is_retry=True) else: self.state = State.FAILED if task.retries: - logging.info('All retries failed; marking task as FAILED') + self.logger.info('All retries failed; marking task as FAILED') else: - logging.info('Marking task as FAILED.') + self.logger.info('Marking task as FAILED.') if task.email_on_failure and task.email: self.email_alert(error, is_retry=False) except Exception as e2: - logging.error( - 'Failed to send email to: ' + str(task.email)) - logging.exception(e2) + self.logger.error('Failed to send email to: %s', task.email) + self.logger.exception(e2) # Handling callbacks pessimistically try: @@ -1615,13 +1607,13 @@ class TaskInstance(Base): if self.state == State.FAILED and task.on_failure_callback: task.on_failure_callback(context) except Exception as e3: - logging.error("Failed at executing callback") - logging.exception(e3) + self.logger.error("Failed at executing callback") + self.logger.exception(e3) if not test_mode: session.merge(self) session.commit() - logging.error(str(error)) + self.logger.error(str(error)) @provide_session def get_template_context(self, session=None): @@ -1939,7 +1931,7 @@ class SkipMixin(object): else: assert execution_date is not None, "Execution date is None and no dag run" - logging.warning("No DAG RUN present this should not happen") + self.logger.warning("No DAG RUN present this should not happen") # this is defensive against dag runs that are not complete for task in tasks: ti = TaskInstance(task, execution_date=execution_date) @@ -1953,7 +1945,7 @@ class SkipMixin(object): @functools.total_ordering -class BaseOperator(object): +class BaseOperator(LoggingMixin): """ Abstract base class for all operators. Since operators create objects that become node in the dag, BaseOperator contains many recursive methods for @@ -2134,8 +2126,7 @@ class BaseOperator(object): self.email_on_failure = email_on_failure self.start_date = start_date if start_date and not isinstance(start_date, datetime): - logging.warning( - "start_date for {} isn't datetime.datetime".format(self)) + self.logger.warning("start_date for %s isn't datetime.datetime", self) self.end_date = end_date if not TriggerRule.is_valid(trigger_rule): raise AirflowException( @@ -2151,10 +2142,12 @@ class BaseOperator(object): self.depends_on_past = True if schedule_interval: - logging.warning( + self.logger.warning( "schedule_interval is used for {}, though it has " "been deprecated as a task parameter, you need to " - "specify it as a DAG parameter instead".format(self)) + "specify it as a DAG parameter instead", + self + ) self._schedule_interval = schedule_interval self.retries = retries self.queue = queue @@ -2167,7 +2160,7 @@ class BaseOperator(object): if isinstance(retry_delay, timedelta): self.retry_delay = retry_delay else: - logging.debug("retry_delay isn't timedelta object, assuming secs") + self.logger.debug("Retry_delay isn't timedelta object, assuming secs") self.retry_delay = timedelta(seconds=retry_delay) self.retry_exponential_backoff = retry_exponential_backoff self.max_retry_delay = max_retry_delay @@ -2467,7 +2460,7 @@ class BaseOperator(object): try: setattr(self, attr, env.loader.get_source(env, content)[0]) except Exception as e: - logging.exception(e) + self.logger.exception(e) self.prepare_template() @property @@ -2586,12 +2579,12 @@ class BaseOperator(object): ignore_ti_state=ignore_ti_state) def dry_run(self): - logging.info('Dry run') + self.logger.info('Dry run') for attr in self.template_fields: content = getattr(self, attr) if content and isinstance(content, six.string_types): - logging.info('Rendering template for {0}'.format(attr)) - logging.info(content) + self.logger.info('Rendering template for %s', attr) + self.logger.info(content) def get_direct_relatives(self, upstream=False): """ @@ -2753,7 +2746,7 @@ class DagModel(Base): @functools.total_ordering -class DAG(BaseDag, LoggingMixin): +class DAG(BaseDag): """ A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start end an end date @@ -3529,7 +3522,8 @@ class DAG(BaseDag, LoggingMixin): d['pickle_len'] = len(pickled) d['pickling_duration'] = "{}".format(datetime.now() - dttm) except Exception as e: - logging.debug(e) + log = LoggingMixin().logger + log.debug(e) d['is_picklable'] = False d['stacktrace'] = traceback.format_exc() return d @@ -3757,12 +3751,13 @@ class DAG(BaseDag, LoggingMixin): :type sync_time: datetime :return: None """ + orm_dag = session.query( DagModel).filter(DagModel.dag_id == dag.dag_id).first() if not orm_dag: orm_dag = DagModel(dag_id=dag.dag_id) - logging.info("Creating ORM DAG for %s", - dag.dag_id) + log = LoggingMixin().logger + log.info("Creating ORM DAG for %s", dag.dag_id) orm_dag.fileloc = dag.fileloc orm_dag.is_subdag = dag.is_subdag orm_dag.owners = owner @@ -3805,13 +3800,14 @@ class DAG(BaseDag, LoggingMixin): :type expiration_date: datetime :return: None """ + logger = LoggingMixin().logger for dag in session.query( DagModel).filter(DagModel.last_scheduler_run < expiration_date, DagModel.is_active).all(): - logging.info("Deactivating DAG ID %s since it was last touched " - "by the scheduler at %s", - dag.dag_id, - dag.last_scheduler_run.isoformat()) + logger.info( + "Deactivating DAG ID %s since it was last touched by the scheduler at %s", + dag.dag_id, dag.last_scheduler_run.isoformat() + ) dag.is_active = False session.merge(dag) session.commit() @@ -3901,7 +3897,7 @@ class KnownEvent(Base): return self.label -class Variable(Base): +class Variable(Base, LoggingMixin): __tablename__ = "variable" id = Column(Integer, primary_key=True) @@ -3937,8 +3933,9 @@ class Variable(Base): self._val = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_encrypted = True except AirflowException: - self.logger.exception("Failed to load fernet while encrypting value, " - "using non-encrypted value.") + self.logger.exception( + "Failed to load fernet while encrypting value, using non-encrypted value." + ) self._val = value self.is_encrypted = False @@ -4005,7 +4002,7 @@ class Variable(Base): session.flush() -class XCom(Base): +class XCom(Base, LoggingMixin): """ Base class for XCom objects. """ @@ -4061,10 +4058,11 @@ class XCom(Base): try: value = json.dumps(value).encode('UTF-8') except ValueError: - logging.error("Could not serialize the XCOM value into JSON. " - "If you are using pickles instead of JSON " - "for XCOM, then you need to enable pickle " - "support for XCOM in your airflow config.") + log = LoggingMixin().logger + log.error("Could not serialize the XCOM value into JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") raise # remove any duplicate XComs @@ -4131,10 +4129,11 @@ class XCom(Base): try: return json.loads(result.value.decode('UTF-8')) except ValueError: - logging.error("Could not serialize the XCOM value into JSON. " - "If you are using pickles instead of JSON " - "for XCOM, then you need to enable pickle " - "support for XCOM in your airflow config.") + log = LoggingMixin().logger + log.error("Could not serialize the XCOM value into JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") raise @classmethod @@ -4180,10 +4179,11 @@ class XCom(Base): try: result.value = json.loads(result.value.decode('UTF-8')) except ValueError: - logging.error("Could not serialize the XCOM value into JSON. " - "If you are using pickles instead of JSON " - "for XCOM, then you need to enable pickle " - "support for XCOM in your airflow config.") + log = LoggingMixin().logger + log.error("Could not serialize the XCOM value into JSON. " + "If you are using pickles instead of JSON " + "for XCOM, then you need to enable pickle " + "support for XCOM in your airflow config.") raise return results @@ -4235,8 +4235,9 @@ class DagStat(Base): session.commit() except Exception as e: session.rollback() - logging.warning("Could not update dag stats for {}".format(dag_id)) - logging.exception(e) + log = LoggingMixin().logger + log.warning("Could not update dag stats for %s", dag_id) + log.exception(e) @staticmethod @provide_session @@ -4287,8 +4288,9 @@ class DagStat(Base): session.commit() except Exception as e: session.rollback() - logging.warning("Could not update dag stat table") - logging.exception(e) + log = LoggingMixin().logger + log.warning("Could not update dag stat table") + log.exception(e) @staticmethod @provide_session @@ -4310,11 +4312,12 @@ class DagStat(Base): session.commit() except Exception as e: session.rollback() - logging.warning("Could not create stat record") - logging.exception(e) + log = LoggingMixin().logger + log.warning("Could not create stat record") + log.exception(e) -class DagRun(Base): +class DagRun(Base, LoggingMixin): """ DagRun describes an instance of a Dag. It can be created by the scheduler (for regular runs) or by an external trigger @@ -4527,8 +4530,7 @@ class DagRun(Base): tis = self.get_task_instances(session=session) - logging.info("Updating state for {} considering {} task(s)" - .format(self, len(tis))) + self.logger.info("Updating state for %s considering %s task(s)", self, len(tis)) for ti in list(tis): # skip in db? @@ -4574,19 +4576,18 @@ class DagRun(Base): # if all roots finished and at least on failed, the run failed if (not unfinished_tasks and any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): - logging.info('Marking run {} failed'.format(self)) + self.logger.info('Marking run %s failed', self) self.state = State.FAILED # if all roots succeeded and no unfinished tasks, the run succeeded elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) for r in roots): - logging.info('Marking run {} successful'.format(self)) + self.logger.info('Marking run %s successful', self) self.state = State.SUCCESS # if *all tasks* are deadlocked, the run failed elif unfinished_tasks and none_depends_on_past and no_dependencies_met: - logging.info( - 'Deadlock; marking run {} failed'.format(self)) + self.logger.info('Deadlock; marking run %s failed', self) self.state = State.FAILED # finally, if the roots aren't done, the dag is still running
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index 3146cd6..63321fb 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -16,7 +16,6 @@ from builtins import bytes import os import signal -import logging from subprocess import Popen, STDOUT, PIPE from tempfile import gettempdir, NamedTemporaryFile @@ -68,7 +67,7 @@ class BashOperator(BaseOperator): which will be cleaned afterwards """ bash_command = self.bash_command - logging.info("tmp dir root location: \n" + gettempdir()) + self.logger.info("Tmp dir root location: \n %s", gettempdir()) with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: @@ -76,9 +75,11 @@ class BashOperator(BaseOperator): f.flush() fname = f.name script_location = tmp_dir + "/" + fname - logging.info("Temporary script " - "location :{0}".format(script_location)) - logging.info("Running command: " + bash_command) + self.logger.info( + "Temporary script location: %s", + script_location + ) + self.logger.info("Running command: %s", bash_command) sp = Popen( ['bash', fname], stdout=PIPE, stderr=STDOUT, @@ -87,14 +88,16 @@ class BashOperator(BaseOperator): self.sp = sp - logging.info("Output:") + self.logger.info("Output:") line = '' for line in iter(sp.stdout.readline, b''): line = line.decode(self.output_encoding).strip() - logging.info(line) + self.logger.info(line) sp.wait() - logging.info("Command exited with " - "return code {0}".format(sp.returncode)) + self.logger.info( + "Command exited with return code %s", + sp.returncode + ) if sp.returncode: raise AirflowException("Bash command failed") @@ -103,6 +106,6 @@ class BashOperator(BaseOperator): return line def on_kill(self): - logging.info('Sending SIGTERM signal to bash process group') + self.logger.info('Sending SIGTERM signal to bash process group') os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/check_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py index 1cf50da..f263a2c 100644 --- a/airflow/operators/check_operator.py +++ b/airflow/operators/check_operator.py @@ -14,7 +14,6 @@ from builtins import zip from builtins import str -import logging from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook @@ -72,15 +71,15 @@ class CheckOperator(BaseOperator): self.sql = sql def execute(self, context=None): - logging.info('Executing SQL check: ' + self.sql) + self.logger.info('Executing SQL check: %s', self.sql) records = self.get_db_hook().get_first(self.sql) - logging.info("Record: " + str(records)) + self.logger.info('Record: %s', records) if not records: raise AirflowException("The query returned None") elif not all([bool(r) for r in records]): exceptstr = "Test failed.\nQuery:\n{q}\nResults:\n{r!s}" raise AirflowException(exceptstr.format(q=self.sql, r=records)) - logging.info("Success.") + self.logger.info("Success.") def get_db_hook(self): return BaseHook.get_hook(conn_id=self.conn_id) @@ -135,7 +134,7 @@ class ValueCheckOperator(BaseOperator): self.has_tolerance = self.tol is not None def execute(self, context=None): - logging.info('Executing SQL check: ' + self.sql) + self.logger.info('Executing SQL check: %s', self.sql) records = self.get_db_hook().get_first(self.sql) if not records: raise AirflowException("The query returned None") @@ -209,9 +208,9 @@ class IntervalCheckOperator(BaseOperator): def execute(self, context=None): hook = self.get_db_hook() - logging.info('Executing SQL check: ' + self.sql2) + self.logger.info('Executing SQL check: %s', self.sql2) row2 = hook.get_first(self.sql2) - logging.info('Executing SQL check: ' + self.sql1) + self.logger.info('Executing SQL check: %s', self.sql1) row1 = hook.get_first(self.sql1) if not row2: raise AirflowException("The query {q} returned None".format(q=self.sql2)) @@ -231,19 +230,20 @@ class IntervalCheckOperator(BaseOperator): else: ratio = float(max(current[m], reference[m])) / \ min(current[m], reference[m]) - logging.info(rlog.format(m, ratio, self.metrics_thresholds[m])) + self.logger.info(rlog.format(m, ratio, self.metrics_thresholds[m])) ratios[m] = ratio test_results[m] = ratio < self.metrics_thresholds[m] if not all(test_results.values()): failed_tests = [it[0] for it in test_results.items() if not it[1]] j = len(failed_tests) n = len(self.metrics_sorted) - logging.warning(countstr.format(**locals())) + self.logger.warning(countstr.format(**locals())) for k in failed_tests: - logging.warning(fstr.format(k=k, r=ratios[k], - tr=self.metrics_thresholds[k])) + self.logger.warning( + fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k]) + ) raise AirflowException(estr.format(", ".join(failed_tests))) - logging.info("All tests have passed") + self.logger.info("All tests have passed") def get_db_hook(self): return BaseHook.get_hook(conn_id=self.conn_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/dagrun_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index c3ffa1a..bd2862b 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -13,13 +13,11 @@ # limitations under the License. from datetime import datetime -import logging from airflow.models import BaseOperator, DagBag from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from airflow import settings -from airflow import configuration as conf class DagRunOrder(object): @@ -71,9 +69,9 @@ class TriggerDagRunOperator(BaseOperator): state=State.RUNNING, conf=dro.payload, external_trigger=True) - logging.info("Creating DagRun {}".format(dr)) + self.logger.info("Creating DagRun %s", dr) session.add(dr) session.commit() session.close() else: - logging.info("Criteria not met, moving on") + self.logger.info("Criteria not met, moving on") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/docker_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index ddcc97b..8a333d6 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -13,7 +13,7 @@ # limitations under the License. import json -import logging + from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -134,7 +134,7 @@ class DockerOperator(BaseOperator): self.container = None def execute(self, context): - logging.info('Starting docker container from image ' + self.image) + self.logger.info('Starting docker container from image %s', self.image) tls_config = None if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key: @@ -155,10 +155,10 @@ class DockerOperator(BaseOperator): image = self.image if self.force_pull or len(self.cli.images(name=image)) == 0: - logging.info('Pulling docker image ' + image) + self.logger.info('Pulling docker image %s', image) for l in self.cli.pull(image, stream=True): output = json.loads(l.decode('utf-8')) - logging.info("{}".format(output['status'])) + self.logger.info("%s", output['status']) cpu_shares = int(round(self.cpus * 1024)) @@ -184,7 +184,7 @@ class DockerOperator(BaseOperator): line = line.strip() if hasattr(line, 'decode'): line = line.decode('utf-8') - logging.info(line) + self.logger.info(line) exit_code = self.cli.wait(self.container['Id']) if exit_code != 0: @@ -202,5 +202,5 @@ class DockerOperator(BaseOperator): def on_kill(self): if self.cli is not None: - logging.info('Stopping docker container') + self.logger.info('Stopping docker container') self.cli.stop(self.container['Id']) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/generic_transfer.py ---------------------------------------------------------------------- diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py index de3bf73..790749a 100644 --- a/airflow/operators/generic_transfer.py +++ b/airflow/operators/generic_transfer.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.hooks.base_hook import BaseHook @@ -64,15 +61,15 @@ class GenericTransfer(BaseOperator): def execute(self, context): source_hook = BaseHook.get_hook(self.source_conn_id) - logging.info("Extracting data from {}".format(self.source_conn_id)) - logging.info("Executing: \n" + self.sql) + self.logger.info("Extracting data from %s", self.source_conn_id) + self.logger.info("Executing: \n %s", self.sql) results = source_hook.get_records(self.sql) destination_hook = BaseHook.get_hook(self.destination_conn_id) if self.preoperator: - logging.info("Running preoperator") - logging.info(self.preoperator) + self.logger.info("Running preoperator") + self.logger.info(self.preoperator) destination_hook.run(self.preoperator) - logging.info("Inserting rows into {}".format(self.destination_conn_id)) + self.logger.info("Inserting rows into %s", self.destination_conn_id) destination_hook.insert_rows(table=self.destination_table, rows=results) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 06a83e3..983069b 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging import re from airflow.hooks.hive_hooks import HiveCliHook @@ -95,7 +93,7 @@ class HiveOperator(BaseOperator): self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:]) def execute(self, context): - logging.info('Executing: ' + self.hql) + self.logger.info('Executing: %s', self.hql) self.hook = self.get_hook() self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=context_to_airflow_vars(context)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_stats_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py index b31c6b5..025e427 100644 --- a/airflow/operators/hive_stats_operator.py +++ b/airflow/operators/hive_stats_operator.py @@ -12,11 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from builtins import str from builtins import zip from collections import OrderedDict import json -import logging from airflow.exceptions import AirflowException from airflow.hooks.mysql_hook import MySqlHook @@ -141,15 +139,15 @@ class HiveStatsCollectionOperator(BaseOperator): """.format(**locals()) hook = PrestoHook(presto_conn_id=self.presto_conn_id) - logging.info('Executing SQL check: ' + sql) + self.logger.info('Executing SQL check: %s', sql) row = hook.get_first(hql=sql) - logging.info("Record: " + str(row)) + self.logger.info("Record: %s", row) if not row: raise AirflowException("The query returned None") part_json = json.dumps(self.partition, sort_keys=True) - logging.info("Deleting rows from previous runs if they exist") + self.logger.info("Deleting rows from previous runs if they exist") mysql = MySqlHook(self.mysql_conn_id) sql = """ SELECT 1 FROM hive_stats @@ -169,7 +167,7 @@ class HiveStatsCollectionOperator(BaseOperator): """.format(**locals()) mysql.run(sql) - logging.info("Pivoting and loading cells into the Airflow db") + self.logger.info("Pivoting and loading cells into the Airflow db") rows = [ (self.ds, self.dttm, self.table, part_json) + (r[0][0], r[0][1], r[1]) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_to_druid.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 70d7825..7ac0b02 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook from airflow.hooks.druid_hook import DruidHook from airflow.models import BaseOperator @@ -90,7 +87,7 @@ class HiveToDruidTransfer(BaseOperator): def execute(self, context): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) - logging.info("Extracting data from Hive") + self.logger.info("Extracting data from Hive") hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_') sql = self.sql.strip().strip(';') hql = """\ @@ -104,7 +101,7 @@ class HiveToDruidTransfer(BaseOperator): AS {sql} """.format(**locals()) - logging.info("Running command:\n {}".format(hql)) + self.logger.info("Running command:\n %s", hql) hive.run_cli(hql) m = HiveMetastoreHook(self.metastore_conn_id) @@ -128,15 +125,16 @@ class HiveToDruidTransfer(BaseOperator): columns=columns, ) - logging.info("Inserting rows into Druid, hdfs path: {}".format(static_path)) + self.logger.info("Inserting rows into Druid, hdfs path: %s", static_path) druid.submit_indexing_job(index_spec) - logging.info("Load seems to have succeeded!") + self.logger.info("Load seems to have succeeded!") finally: - logging.info( - "Cleaning up by dropping the temp " - "Hive table {}".format(hive_table)) + self.logger.info( + "Cleaning up by dropping the temp Hive table %s", + hive_table + ) hql = "DROP TABLE IF EXISTS {}".format(hive_table) hive.run_cli(hql) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_to_mysql.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py index 4a64749..e82a099 100644 --- a/airflow/operators/hive_to_mysql.py +++ b/airflow/operators/hive_to_mysql.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.hive_hooks import HiveServer2Hook from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator @@ -80,8 +77,7 @@ class HiveToMySqlTransfer(BaseOperator): def execute(self, context): hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id) - logging.info("Extracting data from Hive") - logging.info(self.sql) + self.logger.info("Extracting data from Hive: %s", self.sql) if self.bulk_load: tmpfile = NamedTemporaryFile() @@ -92,10 +88,10 @@ class HiveToMySqlTransfer(BaseOperator): mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) if self.mysql_preoperator: - logging.info("Running MySQL preoperator") + self.logger.info("Running MySQL preoperator") mysql.run(self.mysql_preoperator) - logging.info("Inserting rows into MySQL") + self.logger.info("Inserting rows into MySQL") if self.bulk_load: mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name) @@ -104,7 +100,7 @@ class HiveToMySqlTransfer(BaseOperator): mysql.insert_rows(table=self.mysql_table, rows=results) if self.mysql_postoperator: - logging.info("Running MySQL postoperator") + self.logger.info("Running MySQL postoperator") mysql.run(self.mysql_postoperator) - logging.info("Done.") + self.logger.info("Done.") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_to_samba_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py index 8f18dd9..d6e6dec 100644 --- a/airflow/operators/hive_to_samba_operator.py +++ b/airflow/operators/hive_to_samba_operator.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging import tempfile from airflow.hooks.hive_hooks import HiveServer2Hook @@ -55,7 +53,7 @@ class Hive2SambaOperator(BaseOperator): samba = SambaHook(samba_conn_id=self.samba_conn_id) hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id) tmpfile = tempfile.NamedTemporaryFile() - logging.info("Fetching file from Hive") + self.logger.info("Fetching file from Hive") hive.to_csv(hql=self.hql, csv_filepath=tmpfile.name) - logging.info("Pushing to samba") + self.logger.info("Pushing to samba") samba.push_from_local(self.destination_filepath, tmpfile.name) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/http_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py index 9884566..d92c931 100644 --- a/airflow/operators/http_operator.py +++ b/airflow/operators/http_operator.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.exceptions import AirflowException from airflow.hooks.http_hook import HttpHook from airflow.models import BaseOperator @@ -45,7 +43,7 @@ class SimpleHttpOperator(BaseOperator): depends on the option that's being modified. """ - template_fields = ('endpoint','data',) + template_fields = ('endpoint', 'data',) template_ext = () ui_color = '#f4a460' @@ -75,7 +73,9 @@ class SimpleHttpOperator(BaseOperator): def execute(self, context): http = HttpHook(self.method, http_conn_id=self.http_conn_id) - logging.info("Calling HTTP method") + + self.logger.info("Calling HTTP method") + response = http.run(self.endpoint, self.data, self.headers, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/jdbc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py index 28977db..942e312 100644 --- a/airflow/operators/jdbc_operator.py +++ b/airflow/operators/jdbc_operator.py @@ -11,11 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -__author__ = 'janomar' - -import logging - from airflow.hooks.jdbc_hook import JdbcHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -60,6 +55,6 @@ class JdbcOperator(BaseOperator): self.autocommit = autocommit def execute(self, context): - logging.info('Executing: ' + str(self.sql)) + self.logger.info('Executing: %s', self.sql) self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id) self.hook.run(self.sql, self.autocommit, parameters=self.parameters) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/latest_only_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py index 909a211..58f7e67 100644 --- a/airflow/operators/latest_only_operator.py +++ b/airflow/operators/latest_only_operator.py @@ -13,7 +13,6 @@ # limitations under the License. import datetime -import logging from airflow.models import BaseOperator, SkipMixin @@ -33,29 +32,29 @@ class LatestOnlyOperator(BaseOperator, SkipMixin): # If the DAG Run is externally triggered, then return without # skipping downstream tasks if context['dag_run'] and context['dag_run'].external_trigger: - logging.info("""Externally triggered DAG_Run: - allowing execution to proceed.""") + self.logger.info("Externally triggered DAG_Run: allowing execution to proceed.") return now = datetime.datetime.now() left_window = context['dag'].following_schedule( context['execution_date']) right_window = context['dag'].following_schedule(left_window) - logging.info( - 'Checking latest only with left_window: %s right_window: %s ' - 'now: %s', left_window, right_window, now) + self.logger.info( + 'Checking latest only with left_window: %s right_window: %s now: %s', + left_window, right_window, now + ) if not left_window < now <= right_window: - logging.info('Not latest execution, skipping downstream.') + self.logger.info('Not latest execution, skipping downstream.') downstream_tasks = context['task'].get_flat_relatives(upstream=False) - logging.debug("Downstream task_ids {}".format(downstream_tasks)) + self.logger.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) - logging.info('Done.') + self.logger.info('Done.') else: - logging.info('Latest, allowing execution to proceed.') + self.logger.info('Latest, allowing execution to proceed.') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mssql_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py index 9ae2fff..bc0822f 100644 --- a/airflow/operators/mssql_operator.py +++ b/airflow/operators/mssql_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.mssql_hook import MsSqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -47,7 +44,7 @@ class MsSqlOperator(BaseOperator): self.database = database def execute(self, context): - logging.info('Executing: ' + str(self.sql)) + self.logger.info('Executing: %s', self.sql) hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id, schema=self.database) hook.run(self.sql, autocommit=self.autocommit, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mssql_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py index a0a2e10..719ddd2 100644 --- a/airflow/operators/mssql_to_hive.py +++ b/airflow/operators/mssql_to_hive.py @@ -15,11 +15,9 @@ from builtins import chr from collections import OrderedDict import unicodecsv as csv -import logging from tempfile import NamedTemporaryFile import pymssql - from airflow.hooks.hive_hooks import HiveCliHook from airflow.hooks.mssql_hook import MsSqlHook from airflow.models import BaseOperator @@ -104,7 +102,7 @@ class MsSqlToHiveTransfer(BaseOperator): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id) - logging.info("Dumping Microsoft SQL Server query results to local file") + self.logger.info("Dumping Microsoft SQL Server query results to local file") conn = mssql.get_conn() cursor = conn.cursor() cursor.execute(self.sql) @@ -120,7 +118,7 @@ class MsSqlToHiveTransfer(BaseOperator): f.flush() cursor.close() conn.close() - logging.info("Loading file into Hive") + self.logger.info("Loading file into Hive") hive.load_file( f.name, self.hive_table, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mysql_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py index 156ada8..923eaf8 100644 --- a/airflow/operators/mysql_operator.py +++ b/airflow/operators/mysql_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -49,7 +46,7 @@ class MySqlOperator(BaseOperator): self.database = database def execute(self, context): - logging.info('Executing: ' + str(self.sql)) + self.logger.info('Executing: %s', self.sql) hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database) hook.run( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mysql_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index ad3ecae..fde92b5 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -15,7 +15,6 @@ from builtins import chr from collections import OrderedDict import unicodecsv as csv -import logging from tempfile import NamedTemporaryFile import MySQLdb @@ -111,7 +110,7 @@ class MySqlToHiveTransfer(BaseOperator): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) - logging.info("Dumping MySQL query results to local file") + self.logger.info("Dumping MySQL query results to local file") conn = mysql.get_conn() cursor = conn.cursor() cursor.execute(self.sql) @@ -124,7 +123,7 @@ class MySqlToHiveTransfer(BaseOperator): f.flush() cursor.close() conn.close() - logging.info("Loading file into Hive") + self.logger.info("Loading file into Hive") hive.load_file( f.name, self.hive_table, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/oracle_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py index ab7bdb2..f87bbf9 100644 --- a/airflow/operators/oracle_operator.py +++ b/airflow/operators/oracle_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.oracle_hook import OracleHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -45,7 +42,7 @@ class OracleOperator(BaseOperator): self.parameters = parameters def execute(self, context): - logging.info('Executing: ' + str(self.sql)) + self.logger.info('Executing: %s', self.sql) hook = OracleHook(oracle_conn_id=self.oracle_conn_id) hook.run( self.sql, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/pig_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py index 4a21ecc..cdce48a 100644 --- a/airflow/operators/pig_operator.py +++ b/airflow/operators/pig_operator.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging import re from airflow.hooks.pig_hook import PigCliHook @@ -61,7 +59,7 @@ class PigOperator(BaseOperator): "(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig) def execute(self, context): - logging.info('Executing: ' + self.pig) + self.logger.info('Executing: %s', self.pig) self.hook = self.get_hook() self.hook.run_cli(pig=self.pig) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/postgres_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py index 0de5aa5..55c1573 100644 --- a/airflow/operators/postgres_operator.py +++ b/airflow/operators/postgres_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.postgres_hook import PostgresHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -52,7 +49,7 @@ class PostgresOperator(BaseOperator): self.database = database def execute(self, context): - logging.info('Executing: ' + str(self.sql)) + self.logger.info('Executing: %s', self.sql) self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database) self.hook.run(self.sql, self.autocommit, parameters=self.parameters) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/presto_to_mysql.py ---------------------------------------------------------------------- diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py index 7ff2ad6..48158ca 100644 --- a/airflow/operators/presto_to_mysql.py +++ b/airflow/operators/presto_to_mysql.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.presto_hook import PrestoHook from airflow.hooks.mysql_hook import MySqlHook from airflow.models import BaseOperator @@ -64,15 +61,14 @@ class PrestoToMySqlTransfer(BaseOperator): def execute(self, context): presto = PrestoHook(presto_conn_id=self.presto_conn_id) - logging.info("Extracting data from Presto") - logging.info(self.sql) + self.logger.info("Extracting data from Presto: %s", self.sql) results = presto.get_records(self.sql) mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) if self.mysql_preoperator: - logging.info("Running MySQL preoperator") - logging.info(self.mysql_preoperator) + self.logger.info("Running MySQL preoperator") + self.logger.info(self.mysql_preoperator) mysql.run(self.mysql_preoperator) - logging.info("Inserting rows into MySQL") + self.logger.info("Inserting rows into MySQL") mysql.insert_rows(table=self.mysql_table, rows=results) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index bef9bb0..552996f 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -11,10 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from builtins import str -import logging - from airflow.exceptions import AirflowException from airflow.models import BaseOperator, SkipMixin from airflow.utils.decorators import apply_defaults @@ -78,7 +74,7 @@ class PythonOperator(BaseOperator): self.op_kwargs = context return_value = self.python_callable(*self.op_args, **self.op_kwargs) - logging.info("Done. Returned value was: " + str(return_value)) + self.logger.info("Done. Returned value was: %s", return_value) return return_value @@ -103,17 +99,17 @@ class BranchPythonOperator(PythonOperator, SkipMixin): """ def execute(self, context): branch = super(BranchPythonOperator, self).execute(context) - logging.info("Following branch {}".format(branch)) - logging.info("Marking other directly downstream tasks as skipped") + self.logger.info("Following branch %s", branch) + self.logger.info("Marking other directly downstream tasks as skipped") downstream_tasks = context['task'].downstream_list - logging.debug("Downstream task_ids {}".format(downstream_tasks)) + self.logger.debug("Downstream task_ids %s", downstream_tasks) skip_tasks = [t for t in downstream_tasks if t.task_id != branch] if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, skip_tasks) - logging.info("Done.") + self.logger.info("Done.") class ShortCircuitOperator(PythonOperator, SkipMixin): @@ -130,18 +126,18 @@ class ShortCircuitOperator(PythonOperator, SkipMixin): """ def execute(self, context): condition = super(ShortCircuitOperator, self).execute(context) - logging.info("Condition result is {}".format(condition)) + self.logger.info("Condition result is %s", condition) if condition: - logging.info('Proceeding with downstream tasks...') + self.logger.info('Proceeding with downstream tasks...') return - logging.info('Skipping downstream tasks...') + self.logger.info('Skipping downstream tasks...') downstream_tasks = context['task'].get_flat_relatives(upstream=False) - logging.debug("Downstream task_ids {}".format(downstream_tasks)) + self.logger.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) - logging.info("Done.") + self.logger.info("Done.") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/redshift_to_s3_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py index fda88d9..e25d613 100644 --- a/airflow/operators/redshift_to_s3_operator.py +++ b/airflow/operators/redshift_to_s3_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.postgres_hook import PostgresHook from airflow.hooks.S3_hook import S3Hook from airflow.models import BaseOperator @@ -71,9 +68,9 @@ class RedshiftToS3Transfer(BaseOperator): self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) self.s3 = S3Hook(s3_conn_id=self.s3_conn_id) a_key, s_key = self.s3.get_credentials() - unload_options = ('\n\t\t\t').join(self.unload_options) + unload_options = '\n\t\t\t'.join(self.unload_options) - logging.info("Retrieving headers from %s.%s..." % (self.schema, self.table)) + self.logger.info("Retrieving headers from %s.%s...", self.schema, self.table) columns_query = """SELECT column_name FROM information_schema.columns @@ -86,9 +83,9 @@ class RedshiftToS3Transfer(BaseOperator): cursor.execute(columns_query) rows = cursor.fetchall() columns = map(lambda row: row[0], rows) - column_names = (', ').join(map(lambda c: "\\'{0}\\'".format(c), columns)) - column_castings = (', ').join(map(lambda c: "CAST({0} AS text) AS {0}".format(c), - columns)) + column_names = ', '.join(map(lambda c: "\\'{0}\\'".format(c), columns)) + column_castings = ', '.join(map(lambda c: "CAST({0} AS text) AS {0}".format(c), + columns)) unload_query = """ UNLOAD ('SELECT {0} @@ -102,6 +99,6 @@ class RedshiftToS3Transfer(BaseOperator): """.format(column_names, column_castings, self.schema, self.table, self.s3_bucket, self.s3_key, a_key, s_key, unload_options) - logging.info('Executing UNLOAD command...') + self.logger.info('Executing UNLOAD command...') self.hook.run(unload_query, self.autocommit) - logging.info("UNLOAD command complete...") + self.logger.info("UNLOAD command complete...") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/s3_file_transform_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index 1cdd0e5..5de5127 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from tempfile import NamedTemporaryFile import subprocess @@ -75,15 +74,15 @@ class S3FileTransformOperator(BaseOperator): def execute(self, context): source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id) dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id) - logging.info("Downloading source S3 file {0}" - "".format(self.source_s3_key)) + self.logger.info("Downloading source S3 file %s", self.source_s3_key) if not source_s3.check_for_key(self.source_s3_key): - raise AirflowException("The source key {0} does not exist" - "".format(self.source_s3_key)) + raise AirflowException("The source key {0} does not exist".format(self.source_s3_key)) source_s3_key_object = source_s3.get_key(self.source_s3_key) with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest: - logging.info("Dumping S3 file {0} contents to local file {1}" - "".format(self.source_s3_key, f_source.name)) + self.logger.info( + "Dumping S3 file %s contents to local file %s", + self.source_s3_key, f_source.name + ) source_s3_key_object.get_contents_to_file(f_source) f_source.flush() source_s3.connection.close() @@ -91,21 +90,20 @@ class S3FileTransformOperator(BaseOperator): [self.transform_script, f_source.name, f_dest.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate() - logging.info("Transform script stdout " - "" + transform_script_stdoutdata) + self.logger.info("Transform script stdout %s", transform_script_stdoutdata) if transform_script_process.returncode > 0: - raise AirflowException("Transform script failed " - "" + transform_script_stderrdata) + raise AirflowException("Transform script failed %s", transform_script_stderrdata) else: - logging.info("Transform script successful." - "Output temporarily located at {0}" - "".format(f_dest.name)) - logging.info("Uploading transformed file to S3") + self.logger.info( + "Transform script successful. Output temporarily located at %s", + f_dest.name + ) + self.logger.info("Uploading transformed file to S3") f_dest.flush() dest_s3.load_file( filename=f_dest.name, key=self.dest_s3_key, replace=self.replace ) - logging.info("Upload successful") + self.logger.info("Upload successful") dest_s3.connection.close() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/s3_to_hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 7ae0616..68fe903 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -14,7 +14,6 @@ from builtins import next from builtins import zip -import logging from tempfile import NamedTemporaryFile from airflow.utils.file import TemporaryDirectory import gzip @@ -29,6 +28,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.compression import uncompress_file + class S3ToHiveTransfer(BaseOperator): """ Moves data from S3 to Hive. The operator downloads a file from S3, @@ -129,7 +129,7 @@ class S3ToHiveTransfer(BaseOperator): # Downloading file from S3 self.s3 = S3Hook(s3_conn_id=self.s3_conn_id) self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) - logging.info("Downloading S3 file") + self.logger.info("Downloading S3 file") if self.wildcard_match: if not self.s3.check_for_wildcard_key(self.s3_key): @@ -146,13 +146,13 @@ class S3ToHiveTransfer(BaseOperator): NamedTemporaryFile(mode="w", dir=tmp_dir, suffix=file_ext) as f: - logging.info("Dumping S3 key {0} contents to local" - " file {1}".format(s3_key_object.key, f.name)) + self.logger.info("Dumping S3 key {0} contents to local file {1}" + .format(s3_key_object.key, f.name)) s3_key_object.get_contents_to_file(f) f.flush() self.s3.connection.close() if not self.headers: - logging.info("Loading file {0} into Hive".format(f.name)) + self.logger.info("Loading file %s into Hive", f.name) self.hive.load_file( f.name, self.hive_table, @@ -165,11 +165,11 @@ class S3ToHiveTransfer(BaseOperator): else: # Decompressing file if self.input_compressed: - logging.info("Uncompressing file {0}".format(f.name)) + self.logger.info("Uncompressing file %s", f.name) fn_uncompressed = uncompress_file(f.name, file_ext, tmp_dir) - logging.info("Uncompressed to {0}".format(fn_uncompressed)) + self.logger.info("Uncompressed to %s", fn_uncompressed) # uncompressed file available now so deleting # compressed file to save disk space f.close() @@ -178,20 +178,19 @@ class S3ToHiveTransfer(BaseOperator): # Testing if header matches field_dict if self.check_headers: - logging.info("Matching file header against field_dict") + self.logger.info("Matching file header against field_dict") header_list = self._get_top_row_as_list(fn_uncompressed) if not self._match_headers(header_list): raise AirflowException("Header check failed") # Deleting top header row - logging.info("Removing header from file {0}". - format(fn_uncompressed)) + self.logger.info("Removing header from file %s", fn_uncompressed) headless_file = ( self._delete_top_row_and_compress(fn_uncompressed, file_ext, tmp_dir)) - logging.info("Headless file {0}".format(headless_file)) - logging.info("Loading file {0} into Hive".format(headless_file)) + self.logger.info("Headless file %s", headless_file) + self.logger.info("Loading file %s into Hive", headless_file) self.hive.load_file(headless_file, self.hive_table, field_dict=self.field_dict, @@ -212,18 +211,18 @@ class S3ToHiveTransfer(BaseOperator): raise AirflowException("Unable to retrieve header row from file") field_names = self.field_dict.keys() if len(field_names) != len(header_list): - logging.warning("Headers count mismatch" - "File headers:\n {header_list}\n" - "Field names: \n {field_names}\n" - "".format(**locals())) + self.logger.warning("Headers count mismatch" + "File headers:\n {header_list}\n" + "Field names: \n {field_names}\n" + "".format(**locals())) return False test_field_match = [h1.lower() == h2.lower() for h1, h2 in zip(header_list, field_names)] if not all(test_field_match): - logging.warning("Headers do not match field names" - "File headers:\n {header_list}\n" - "Field names: \n {field_names}\n" - "".format(**locals())) + self.logger.warning("Headers do not match field names" + "File headers:\n {header_list}\n" + "Field names: \n {field_names}\n" + "".format(**locals())) return False else: return True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 409c18d..ea301dc 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -14,12 +14,14 @@ from __future__ import print_function from future import standard_library + +from airflow.utils.log.LoggingMixin import LoggingMixin + standard_library.install_aliases() from builtins import str from past.builtins import basestring from datetime import datetime -import logging from urllib.parse import urlparse from time import sleep import re @@ -80,7 +82,7 @@ class BaseSensorOperator(BaseOperator): else: raise AirflowSensorTimeout('Snap. Time is OUT.') sleep(self.poke_interval) - logging.info("Success criteria met. Exiting.") + self.logger.info("Success criteria met. Exiting.") class SqlSensor(BaseSensorOperator): @@ -106,7 +108,7 @@ class SqlSensor(BaseSensorOperator): def poke(self, context): hook = BaseHook.get_connection(self.conn_id).get_hook() - logging.info('Poking: ' + self.sql) + self.logger.info('Poking: %s', self.sql) records = hook.get_records(self.sql) if not records: return False @@ -235,7 +237,7 @@ class ExternalTaskSensor(BaseSensorOperator): serialized_dttm_filter = ','.join( [datetime.isoformat() for datetime in dttm_filter]) - logging.info( + self.logger.info( 'Poking for ' '{self.external_dag_id}.' '{self.external_task_id} on ' @@ -311,7 +313,7 @@ class NamedHivePartitionSensor(BaseSensorOperator): schema, table, partition = self.parse_partition_name(partition) - logging.info( + self.logger.info( 'Poking for {schema}.{table}/{partition}'.format(**locals()) ) return self.hook.check_for_named_partition( @@ -369,7 +371,7 @@ class HivePartitionSensor(BaseSensorOperator): def poke(self, context): if '.' in self.table: self.schema, self.table = self.table.split('.') - logging.info( + self.logger.info( 'Poking for table {self.schema}.{self.table}, ' 'partition {self.partition}'.format(**locals())) if not hasattr(self, 'hook'): @@ -415,10 +417,11 @@ class HdfsSensor(BaseSensorOperator): :return: (bool) depending on the matching criteria """ if size: - logging.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result)) + log = LoggingMixin().logger + log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result)) size *= settings.MEGABYTE result = [x for x in result if x['length'] >= size] - logging.debug('HdfsSensor.poke: after size filter result is %s', result) + log.debug('HdfsSensor.poke: after size filter result is %s', result) return result @staticmethod @@ -432,31 +435,33 @@ class HdfsSensor(BaseSensorOperator): :return: (list) of dicts which were not removed """ if ignore_copying: + log = LoggingMixin().logger regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext) ignored_extentions_regex = re.compile(regex_builder) - logging.debug('Filtering result for ignored extentions: %s in files %s', ignored_extentions_regex.pattern, - map(lambda x: x['path'], result)) + log.debug( + 'Filtering result for ignored extensions: %s in files %s', + ignored_extentions_regex.pattern, map(lambda x: x['path'], result) + ) result = [x for x in result if not ignored_extentions_regex.match(x['path'])] - logging.debug('HdfsSensor.poke: after ext filter result is %s', result) + log.debug('HdfsSensor.poke: after ext filter result is %s', result) return result def poke(self, context): sb = self.hook(self.hdfs_conn_id).get_conn() - logging.getLogger("snakebite").setLevel(logging.WARNING) - logging.info('Poking for file {self.filepath} '.format(**locals())) + self.logger.info('Poking for file {self.filepath}'.format(**locals())) try: # IMOO it's not right here, as there no raise of any kind. # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*', # it's not correct as the directory exists and sb does not raise any error # here is a quick fix result = [f for f in sb.ls([self.filepath], include_toplevel=False)] - logging.debug('HdfsSensor.poke: result is %s', result) + self.logger.debug('HdfsSensor.poke: result is %s', result) result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) result = self.filter_for_filesize(result, self.file_size) return bool(result) except: e = sys.exc_info() - logging.debug("Caught an exception !: %s", str(e)) + self.logger.debug("Caught an exception !: %s", str(e)) return False @@ -479,8 +484,7 @@ class WebHdfsSensor(BaseSensorOperator): def poke(self, context): from airflow.hooks.webhdfs_hook import WebHDFSHook c = WebHDFSHook(self.webhdfs_conn_id) - logging.info( - 'Poking for file {self.filepath} '.format(**locals())) + self.logger.info('Poking for file {self.filepath}'.format(**locals())) return c.check_for_path(hdfs_path=self.filepath) @@ -531,7 +535,7 @@ class S3KeySensor(BaseSensorOperator): from airflow.hooks.S3_hook import S3Hook hook = S3Hook(s3_conn_id=self.s3_conn_id) full_url = "s3://" + self.bucket_name + "/" + self.bucket_key - logging.info('Poking for key : {full_url}'.format(**locals())) + self.logger.info('Poking for key : {full_url}'.format(**locals())) if self.wildcard_match: return hook.check_for_wildcard_key(self.bucket_key, self.bucket_name) @@ -573,7 +577,7 @@ class S3PrefixSensor(BaseSensorOperator): self.s3_conn_id = s3_conn_id def poke(self, context): - logging.info('Poking for prefix : {self.prefix}\n' + self.logger.info('Poking for prefix : {self.prefix}\n' 'in bucket s3://{self.bucket_name}'.format(**locals())) from airflow.hooks.S3_hook import S3Hook hook = S3Hook(s3_conn_id=self.s3_conn_id) @@ -598,8 +602,7 @@ class TimeSensor(BaseSensorOperator): self.target_time = target_time def poke(self, context): - logging.info( - 'Checking if the time ({0}) has come'.format(self.target_time)) + self.logger.info('Checking if the time (%s) has come', self.target_time) return datetime.now().time() > self.target_time @@ -624,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator): dag = context['dag'] target_dttm = dag.following_schedule(context['execution_date']) target_dttm += self.delta - logging.info('Checking if the time ({0}) has come'.format(target_dttm)) + self.logger.info('Checking if the time (%s) has come', target_dttm) return datetime.now() > target_dttm @@ -676,7 +679,7 @@ class HttpSensor(BaseSensorOperator): http_conn_id=http_conn_id) def poke(self, context): - logging.info('Poking: ' + self.endpoint) + self.logger.info('Poking: %s', self.endpoint) try: response = self.hook.run(self.endpoint, data=self.request_params, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/slack_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py index 86659d9..4f2d7bc 100644 --- a/airflow/operators/slack_operator.py +++ b/airflow/operators/slack_operator.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from slackclient import SlackClient from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.exceptions import AirflowException -import json -import logging class SlackAPIOperator(BaseOperator): @@ -66,8 +66,9 @@ class SlackAPIOperator(BaseOperator): sc = SlackClient(self.token) rc = sc.api_call(self.method, **self.api_params) if not rc['ok']: - logging.error("Slack API call failed ({})".format(rc['error'])) - raise AirflowException("Slack API call failed: ({})".format(rc['error'])) + msg = "Slack API call failed (%s)".format(rc['error']) + self.logger.error(msg) + raise AirflowException(msg) class SlackAPIPostOperator(SlackAPIOperator): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/sqlite_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py index 0ff4d05..7c85847 100644 --- a/airflow/operators/sqlite_operator.py +++ b/airflow/operators/sqlite_operator.py @@ -11,9 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from airflow.hooks.sqlite_hook import SqliteHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -44,6 +41,6 @@ class SqliteOperator(BaseOperator): self.parameters = parameters or [] def execute(self, context): - logging.info('Executing: ' + self.sql) + self.logger.info('Executing: %s', self.sql) hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id) hook.run(self.sql, parameters=self.parameters)
