http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index f690fb4..28dcc04 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -77,7 +77,7 @@ from airflow.utils.operator_resources import Resources from airflow.utils.state import State from airflow.utils.timeout import timeout from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin Base = declarative_base() ID_LEN = 250 @@ -184,7 +184,7 @@ class DagBag(BaseDagBag, LoggingMixin): if executor is None: executor = GetDefaultExecutor() dag_folder = dag_folder or settings.DAGS_FOLDER - self.logger.info("Filling up the DagBag from %s", dag_folder) + self.log.info("Filling up the DagBag from %s", dag_folder) self.dag_folder = dag_folder self.dags = {} # the file's last modified timestamp when we last read it @@ -257,7 +257,7 @@ class DagBag(BaseDagBag, LoggingMixin): return found_dags except Exception as e: - self.logger.exception(e) + self.log.exception(e) return found_dags mods = [] @@ -269,7 +269,7 @@ class DagBag(BaseDagBag, LoggingMixin): self.file_last_changed[filepath] = file_last_changed_on_disk return found_dags - self.logger.debug("Importing %s", filepath) + self.log.debug("Importing %s", filepath) org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1]) mod_name = ('unusual_prefix_' + hashlib.sha1(filepath.encode('utf-8')).hexdigest() + @@ -283,7 +283,7 @@ class DagBag(BaseDagBag, LoggingMixin): m = imp.load_source(mod_name, filepath) mods.append(m) except Exception as e: - self.logger.exception("Failed to import: %s", filepath) + self.log.exception("Failed to import: %s", filepath) self.import_errors[filepath] = str(e) self.file_last_changed[filepath] = file_last_changed_on_disk @@ -294,10 +294,10 @@ class DagBag(BaseDagBag, LoggingMixin): mod_name, ext = os.path.splitext(mod.filename) if not head and (ext == '.py' or ext == '.pyc'): if mod_name == '__init__': - self.logger.warning("Found __init__.%s at root of %s", ext, filepath) + self.log.warning("Found __init__.%s at root of %s", ext, filepath) if safe_mode: with zip_file.open(mod.filename) as zf: - self.logger.debug("Reading %s from %s", mod.filename, filepath) + self.log.debug("Reading %s from %s", mod.filename, filepath) content = zf.read() if not all([s in content for s in (b'DAG', b'airflow')]): self.file_last_changed[filepath] = ( @@ -313,7 +313,7 @@ class DagBag(BaseDagBag, LoggingMixin): m = importlib.import_module(mod_name) mods.append(m) except Exception as e: - self.logger.exception("Failed to import: %s", filepath) + self.log.exception("Failed to import: %s", filepath) self.import_errors[filepath] = str(e) self.file_last_changed[filepath] = file_last_changed_on_disk @@ -336,11 +336,11 @@ class DagBag(BaseDagBag, LoggingMixin): Fails tasks that haven't had a heartbeat in too long """ from airflow.jobs import LocalTaskJob as LJ - self.logger.info("Finding 'running' jobs without a recent heartbeat") + self.log.info("Finding 'running' jobs without a recent heartbeat") TI = TaskInstance secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold') limit_dttm = datetime.now() - timedelta(seconds=secs) - self.logger.info("Failing jobs without heartbeat after %s", limit_dttm) + self.log.info("Failing jobs without heartbeat after %s", limit_dttm) tis = ( session.query(TI) @@ -361,7 +361,7 @@ class DagBag(BaseDagBag, LoggingMixin): task = dag.get_task(ti.task_id) ti.task = task ti.handle_failure("{} killed as zombie".format(str(ti))) - self.logger.info('Marked zombie job %s as failed', ti) + self.log.info('Marked zombie job %s as failed', ti) Stats.incr('zombies_killed') session.commit() @@ -381,7 +381,7 @@ class DagBag(BaseDagBag, LoggingMixin): subdag.parent_dag = dag subdag.is_subdag = True self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag) - self.logger.debug('Loaded DAG {dag}'.format(**locals())) + self.log.debug('Loaded DAG {dag}'.format(**locals())) def collect_dags( self, @@ -439,7 +439,7 @@ class DagBag(BaseDagBag, LoggingMixin): str([dag.dag_id for dag in found_dags]), )) except Exception as e: - self.logger.warning(e) + self.log.warning(e) Stats.gauge( 'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1) Stats.gauge( @@ -606,7 +606,7 @@ class Connection(Base, LoggingMixin): self._password = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_encrypted = True except AirflowException: - self.logger.exception("Failed to load fernet while encrypting value, " + self.log.exception("Failed to load fernet while encrypting value, " "using non-encrypted value.") self._password = value self.is_encrypted = False @@ -635,7 +635,7 @@ class Connection(Base, LoggingMixin): self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_extra_encrypted = True except AirflowException: - self.logger.exception("Failed to load fernet while encrypting value, " + self.log.exception("Failed to load fernet while encrypting value, " "using non-encrypted value.") self._extra = value self.is_extra_encrypted = False @@ -706,8 +706,8 @@ class Connection(Base, LoggingMixin): try: obj = json.loads(self.extra) except Exception as e: - self.logger.exception(e) - self.logger.error("Failed parsing the json for conn_id %s", self.conn_id) + self.log.exception(e) + self.log.error("Failed parsing the json for conn_id %s", self.conn_id) return obj @@ -1001,7 +1001,7 @@ class TaskInstance(Base, LoggingMixin): """ Forces the task instance's state to FAILED in the database. """ - self.logger.error("Recording the task instance as FAILED") + self.log.error("Recording the task instance as FAILED") self.state = State.FAILED session.merge(self) session.commit() @@ -1152,7 +1152,7 @@ class TaskInstance(Base, LoggingMixin): session=session): failed = True if verbose: - self.logger.info( + self.log.info( "Dependencies not met for %s, dependency '%s' FAILED: %s", self, dep_status.dep_name, dep_status.reason ) @@ -1161,7 +1161,7 @@ class TaskInstance(Base, LoggingMixin): return False if verbose: - self.logger.info("Dependencies all met for %s", self) + self.log.info("Dependencies all met for %s", self) return True @@ -1177,7 +1177,7 @@ class TaskInstance(Base, LoggingMixin): session, dep_context): - self.logger.debug( + self.log.debug( "%s dependency '%s' PASSED: %s, %s", self, dep_status.dep_name, dep_status.passed, dep_status.reason ) @@ -1354,10 +1354,10 @@ class TaskInstance(Base, LoggingMixin): "runtime. Attempt {attempt} of {total}. State set to NONE.").format( attempt=self.try_number + 1, total=self.max_tries + 1) - self.logger.warning(hr + msg + hr) + self.log.warning(hr + msg + hr) self.queued_dttm = datetime.now() - self.logger.info("Queuing into pool %s", self.pool) + self.log.info("Queuing into pool %s", self.pool) session.merge(self) session.commit() return False @@ -1366,12 +1366,12 @@ class TaskInstance(Base, LoggingMixin): # the current worker process was blocked on refresh_from_db if self.state == State.RUNNING: msg = "Task Instance already running {}".format(self) - self.logger.warning(msg) + self.log.warning(msg) session.commit() return False # print status message - self.logger.info(hr + msg + hr) + self.log.info(hr + msg + hr) self.try_number += 1 if not test_mode: @@ -1389,10 +1389,10 @@ class TaskInstance(Base, LoggingMixin): if verbose: if mark_success: msg = "Marking success for {} on {}".format(self.task, self.execution_date) - self.logger.info(msg) + self.log.info(msg) else: msg = "Executing {} on {}".format(self.task, self.execution_date) - self.logger.info(msg) + self.log.info(msg) return True @provide_session @@ -1434,7 +1434,7 @@ class TaskInstance(Base, LoggingMixin): def signal_handler(signum, frame): """Setting kill signal handler""" - self.logger.error("Killing subprocess") + self.log.error("Killing subprocess") task_copy.on_kill() raise AirflowException("Task received SIGTERM signal") signal.signal(signal.SIGTERM, signal_handler) @@ -1513,8 +1513,8 @@ class TaskInstance(Base, LoggingMixin): if task.on_success_callback: task.on_success_callback(context) except Exception as e3: - self.logger.error("Failed when executing success callback") - self.logger.exception(e3) + self.log.error("Failed when executing success callback") + self.log.exception(e3) session.commit() @@ -1559,7 +1559,7 @@ class TaskInstance(Base, LoggingMixin): task_copy.dry_run() def handle_failure(self, error, test_mode=False, context=None): - self.logger.exception(error) + self.log.exception(error) task = self.task session = settings.Session() self.end_date = datetime.now() @@ -1580,20 +1580,20 @@ class TaskInstance(Base, LoggingMixin): # next task instance try_number exceeds the max_tries. if task.retries and self.try_number <= self.max_tries: self.state = State.UP_FOR_RETRY - self.logger.info('Marking task as UP_FOR_RETRY') + self.log.info('Marking task as UP_FOR_RETRY') if task.email_on_retry and task.email: self.email_alert(error, is_retry=True) else: self.state = State.FAILED if task.retries: - self.logger.info('All retries failed; marking task as FAILED') + self.log.info('All retries failed; marking task as FAILED') else: - self.logger.info('Marking task as FAILED.') + self.log.info('Marking task as FAILED.') if task.email_on_failure and task.email: self.email_alert(error, is_retry=False) except Exception as e2: - self.logger.error('Failed to send email to: %s', task.email) - self.logger.exception(e2) + self.log.error('Failed to send email to: %s', task.email) + self.log.exception(e2) # Handling callbacks pessimistically try: @@ -1602,13 +1602,13 @@ class TaskInstance(Base, LoggingMixin): if self.state == State.FAILED and task.on_failure_callback: task.on_failure_callback(context) except Exception as e3: - self.logger.error("Failed at executing callback") - self.logger.exception(e3) + self.log.error("Failed at executing callback") + self.log.exception(e3) if not test_mode: session.merge(self) session.commit() - self.logger.error(str(error)) + self.log.error(str(error)) @provide_session def get_template_context(self, session=None): @@ -1898,7 +1898,7 @@ class Log(Base): self.owner = owner or task_owner -class SkipMixin(object): +class SkipMixin(LoggingMixin): def skip(self, dag_run, execution_date, tasks): """ Sets tasks instances to skipped from the same dag run. @@ -1926,7 +1926,7 @@ class SkipMixin(object): else: assert execution_date is not None, "Execution date is None and no dag run" - self.logger.warning("No DAG RUN present this should not happen") + self.log.warning("No DAG RUN present this should not happen") # this is defensive against dag runs that are not complete for task in tasks: ti = TaskInstance(task, execution_date=execution_date) @@ -2121,7 +2121,7 @@ class BaseOperator(LoggingMixin): self.email_on_failure = email_on_failure self.start_date = start_date if start_date and not isinstance(start_date, datetime): - self.logger.warning("start_date for %s isn't datetime.datetime", self) + self.log.warning("start_date for %s isn't datetime.datetime", self) self.end_date = end_date if not TriggerRule.is_valid(trigger_rule): raise AirflowException( @@ -2137,7 +2137,7 @@ class BaseOperator(LoggingMixin): self.depends_on_past = True if schedule_interval: - self.logger.warning( + self.log.warning( "schedule_interval is used for {}, though it has " "been deprecated as a task parameter, you need to " "specify it as a DAG parameter instead", @@ -2155,7 +2155,7 @@ class BaseOperator(LoggingMixin): if isinstance(retry_delay, timedelta): self.retry_delay = retry_delay else: - self.logger.debug("Retry_delay isn't timedelta object, assuming secs") + self.log.debug("Retry_delay isn't timedelta object, assuming secs") self.retry_delay = timedelta(seconds=retry_delay) self.retry_exponential_backoff = retry_exponential_backoff self.max_retry_delay = max_retry_delay @@ -2455,7 +2455,7 @@ class BaseOperator(LoggingMixin): try: setattr(self, attr, env.loader.get_source(env, content)[0]) except Exception as e: - self.logger.exception(e) + self.log.exception(e) self.prepare_template() @property @@ -2574,12 +2574,12 @@ class BaseOperator(LoggingMixin): ignore_ti_state=ignore_ti_state) def dry_run(self): - self.logger.info('Dry run') + self.log.info('Dry run') for attr in self.template_fields: content = getattr(self, attr) if content and isinstance(content, six.string_types): - self.logger.info('Rendering template for %s', attr) - self.logger.info(content) + self.log.info('Rendering template for %s', attr) + self.log.info(content) def get_direct_relatives(self, upstream=False): """ @@ -3517,7 +3517,7 @@ class DAG(BaseDag, LoggingMixin): d['pickle_len'] = len(pickled) d['pickling_duration'] = "{}".format(datetime.now() - dttm) except Exception as e: - self.logger.debug(e) + self.log.debug(e) d['is_picklable'] = False d['stacktrace'] = traceback.format_exc() return d @@ -3754,7 +3754,7 @@ class DAG(BaseDag, LoggingMixin): DagModel).filter(DagModel.dag_id == self.dag_id).first() if not orm_dag: orm_dag = DagModel(dag_id=self.dag_id) - self.logger.info("Creating ORM DAG for %s", self.dag_id) + self.log.info("Creating ORM DAG for %s", self.dag_id) orm_dag.fileloc = self.fileloc orm_dag.is_subdag = self.is_subdag orm_dag.owners = owner @@ -3797,11 +3797,11 @@ class DAG(BaseDag, LoggingMixin): :type expiration_date: datetime :return: None """ - logger = LoggingMixin().logger + log = LoggingMixin().log for dag in session.query( DagModel).filter(DagModel.last_scheduler_run < expiration_date, DagModel.is_active).all(): - logger.info( + log.info( "Deactivating DAG ID %s since it was last touched by the scheduler at %s", dag.dag_id, dag.last_scheduler_run.isoformat() ) @@ -3930,7 +3930,7 @@ class Variable(Base, LoggingMixin): self._val = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_encrypted = True except AirflowException: - self.logger.exception( + self.log.exception( "Failed to load fernet while encrypting value, using non-encrypted value." ) self._val = value @@ -4052,7 +4052,7 @@ class XCom(Base, LoggingMixin): try: value = json.dumps(value).encode('UTF-8') except ValueError: - log = LoggingMixin().logger + log = LoggingMixin().log log.error("Could not serialize the XCOM value into JSON. " "If you are using pickles instead of JSON " "for XCOM, then you need to enable pickle " @@ -4123,7 +4123,7 @@ class XCom(Base, LoggingMixin): try: return json.loads(result.value.decode('UTF-8')) except ValueError: - log = LoggingMixin().logger + log = LoggingMixin().log log.error("Could not serialize the XCOM value into JSON. " "If you are using pickles instead of JSON " "for XCOM, then you need to enable pickle " @@ -4173,7 +4173,7 @@ class XCom(Base, LoggingMixin): try: result.value = json.loads(result.value.decode('UTF-8')) except ValueError: - log = LoggingMixin().logger + log = LoggingMixin().log log.error("Could not serialize the XCOM value into JSON. " "If you are using pickles instead of JSON " "for XCOM, then you need to enable pickle " @@ -4229,7 +4229,7 @@ class DagStat(Base): session.commit() except Exception as e: session.rollback() - log = LoggingMixin().logger + log = LoggingMixin().log log.warning("Could not update dag stats for %s", dag_id) log.exception(e) @@ -4282,7 +4282,7 @@ class DagStat(Base): session.commit() except Exception as e: session.rollback() - log = LoggingMixin().logger + log = LoggingMixin().log log.warning("Could not update dag stat table") log.exception(e) @@ -4306,7 +4306,7 @@ class DagStat(Base): session.commit() except Exception as e: session.rollback() - log = LoggingMixin().logger + log = LoggingMixin().log log.warning("Could not create stat record") log.exception(e) @@ -4524,7 +4524,7 @@ class DagRun(Base, LoggingMixin): tis = self.get_task_instances(session=session) - self.logger.info("Updating state for %s considering %s task(s)", self, len(tis)) + self.log.info("Updating state for %s considering %s task(s)", self, len(tis)) for ti in list(tis): # skip in db? @@ -4570,18 +4570,18 @@ class DagRun(Base, LoggingMixin): # if all roots finished and at least on failed, the run failed if (not unfinished_tasks and any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): - self.logger.info('Marking run %s failed', self) + self.log.info('Marking run %s failed', self) self.state = State.FAILED # if all roots succeeded and no unfinished tasks, the run succeeded elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) for r in roots): - self.logger.info('Marking run %s successful', self) + self.log.info('Marking run %s successful', self) self.state = State.SUCCESS # if *all tasks* are deadlocked, the run failed elif unfinished_tasks and none_depends_on_past and no_dependencies_met: - self.logger.info('Deadlock; marking run %s failed', self) + self.log.info('Deadlock; marking run %s failed', self) self.state = State.FAILED # finally, if the roots aren't done, the dag is still running
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index 63321fb..ff2ed51 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -67,7 +67,7 @@ class BashOperator(BaseOperator): which will be cleaned afterwards """ bash_command = self.bash_command - self.logger.info("Tmp dir root location: \n %s", gettempdir()) + self.log.info("Tmp dir root location: \n %s", gettempdir()) with TemporaryDirectory(prefix='airflowtmp') as tmp_dir: with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f: @@ -75,11 +75,11 @@ class BashOperator(BaseOperator): f.flush() fname = f.name script_location = tmp_dir + "/" + fname - self.logger.info( + self.log.info( "Temporary script location: %s", script_location ) - self.logger.info("Running command: %s", bash_command) + self.log.info("Running command: %s", bash_command) sp = Popen( ['bash', fname], stdout=PIPE, stderr=STDOUT, @@ -88,13 +88,13 @@ class BashOperator(BaseOperator): self.sp = sp - self.logger.info("Output:") + self.log.info("Output:") line = '' for line in iter(sp.stdout.readline, b''): line = line.decode(self.output_encoding).strip() - self.logger.info(line) + self.log.info(line) sp.wait() - self.logger.info( + self.log.info( "Command exited with return code %s", sp.returncode ) @@ -106,6 +106,6 @@ class BashOperator(BaseOperator): return line def on_kill(self): - self.logger.info('Sending SIGTERM signal to bash process group') + self.log.info('Sending SIGTERM signal to bash process group') os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/check_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py index f263a2c..ff82539 100644 --- a/airflow/operators/check_operator.py +++ b/airflow/operators/check_operator.py @@ -71,15 +71,15 @@ class CheckOperator(BaseOperator): self.sql = sql def execute(self, context=None): - self.logger.info('Executing SQL check: %s', self.sql) + self.log.info('Executing SQL check: %s', self.sql) records = self.get_db_hook().get_first(self.sql) - self.logger.info('Record: %s', records) + self.log.info('Record: %s', records) if not records: raise AirflowException("The query returned None") elif not all([bool(r) for r in records]): exceptstr = "Test failed.\nQuery:\n{q}\nResults:\n{r!s}" raise AirflowException(exceptstr.format(q=self.sql, r=records)) - self.logger.info("Success.") + self.log.info("Success.") def get_db_hook(self): return BaseHook.get_hook(conn_id=self.conn_id) @@ -134,7 +134,7 @@ class ValueCheckOperator(BaseOperator): self.has_tolerance = self.tol is not None def execute(self, context=None): - self.logger.info('Executing SQL check: %s', self.sql) + self.log.info('Executing SQL check: %s', self.sql) records = self.get_db_hook().get_first(self.sql) if not records: raise AirflowException("The query returned None") @@ -208,9 +208,9 @@ class IntervalCheckOperator(BaseOperator): def execute(self, context=None): hook = self.get_db_hook() - self.logger.info('Executing SQL check: %s', self.sql2) + self.log.info('Executing SQL check: %s', self.sql2) row2 = hook.get_first(self.sql2) - self.logger.info('Executing SQL check: %s', self.sql1) + self.log.info('Executing SQL check: %s', self.sql1) row1 = hook.get_first(self.sql1) if not row2: raise AirflowException("The query {q} returned None".format(q=self.sql2)) @@ -230,20 +230,20 @@ class IntervalCheckOperator(BaseOperator): else: ratio = float(max(current[m], reference[m])) / \ min(current[m], reference[m]) - self.logger.info(rlog.format(m, ratio, self.metrics_thresholds[m])) + self.log.info(rlog.format(m, ratio, self.metrics_thresholds[m])) ratios[m] = ratio test_results[m] = ratio < self.metrics_thresholds[m] if not all(test_results.values()): failed_tests = [it[0] for it in test_results.items() if not it[1]] j = len(failed_tests) n = len(self.metrics_sorted) - self.logger.warning(countstr.format(**locals())) + self.log.warning(countstr.format(**locals())) for k in failed_tests: - self.logger.warning( + self.log.warning( fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k]) ) raise AirflowException(estr.format(", ".join(failed_tests))) - self.logger.info("All tests have passed") + self.log.info("All tests have passed") def get_db_hook(self): return BaseHook.get_hook(conn_id=self.conn_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/dagrun_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index bd2862b..3a952cd 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -69,9 +69,9 @@ class TriggerDagRunOperator(BaseOperator): state=State.RUNNING, conf=dro.payload, external_trigger=True) - self.logger.info("Creating DagRun %s", dr) + self.log.info("Creating DagRun %s", dr) session.add(dr) session.commit() session.close() else: - self.logger.info("Criteria not met, moving on") + self.log.info("Criteria not met, moving on") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/docker_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index 8a333d6..3011f1c 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -134,7 +134,7 @@ class DockerOperator(BaseOperator): self.container = None def execute(self, context): - self.logger.info('Starting docker container from image %s', self.image) + self.log.info('Starting docker container from image %s', self.image) tls_config = None if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key: @@ -155,10 +155,10 @@ class DockerOperator(BaseOperator): image = self.image if self.force_pull or len(self.cli.images(name=image)) == 0: - self.logger.info('Pulling docker image %s', image) + self.log.info('Pulling docker image %s', image) for l in self.cli.pull(image, stream=True): output = json.loads(l.decode('utf-8')) - self.logger.info("%s", output['status']) + self.log.info("%s", output['status']) cpu_shares = int(round(self.cpus * 1024)) @@ -184,7 +184,7 @@ class DockerOperator(BaseOperator): line = line.strip() if hasattr(line, 'decode'): line = line.decode('utf-8') - self.logger.info(line) + self.log.info(line) exit_code = self.cli.wait(self.container['Id']) if exit_code != 0: @@ -202,5 +202,5 @@ class DockerOperator(BaseOperator): def on_kill(self): if self.cli is not None: - self.logger.info('Stopping docker container') + self.log.info('Stopping docker container') self.cli.stop(self.container['Id']) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/generic_transfer.py ---------------------------------------------------------------------- diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py index 790749a..c8a2a58 100644 --- a/airflow/operators/generic_transfer.py +++ b/airflow/operators/generic_transfer.py @@ -61,15 +61,15 @@ class GenericTransfer(BaseOperator): def execute(self, context): source_hook = BaseHook.get_hook(self.source_conn_id) - self.logger.info("Extracting data from %s", self.source_conn_id) - self.logger.info("Executing: \n %s", self.sql) + self.log.info("Extracting data from %s", self.source_conn_id) + self.log.info("Executing: \n %s", self.sql) results = source_hook.get_records(self.sql) destination_hook = BaseHook.get_hook(self.destination_conn_id) if self.preoperator: - self.logger.info("Running preoperator") - self.logger.info(self.preoperator) + self.log.info("Running preoperator") + self.log.info(self.preoperator) destination_hook.run(self.preoperator) - self.logger.info("Inserting rows into %s", self.destination_conn_id) + self.log.info("Inserting rows into %s", self.destination_conn_id) destination_hook.insert_rows(table=self.destination_table, rows=results) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index 983069b..221feeb 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -93,7 +93,7 @@ class HiveOperator(BaseOperator): self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:]) def execute(self, context): - self.logger.info('Executing: %s', self.hql) + self.log.info('Executing: %s', self.hql) self.hook = self.get_hook() self.hook.run_cli(hql=self.hql, schema=self.schema, hive_conf=context_to_airflow_vars(context)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_stats_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py index 025e427..896547e 100644 --- a/airflow/operators/hive_stats_operator.py +++ b/airflow/operators/hive_stats_operator.py @@ -139,15 +139,15 @@ class HiveStatsCollectionOperator(BaseOperator): """.format(**locals()) hook = PrestoHook(presto_conn_id=self.presto_conn_id) - self.logger.info('Executing SQL check: %s', sql) + self.log.info('Executing SQL check: %s', sql) row = hook.get_first(hql=sql) - self.logger.info("Record: %s", row) + self.log.info("Record: %s", row) if not row: raise AirflowException("The query returned None") part_json = json.dumps(self.partition, sort_keys=True) - self.logger.info("Deleting rows from previous runs if they exist") + self.log.info("Deleting rows from previous runs if they exist") mysql = MySqlHook(self.mysql_conn_id) sql = """ SELECT 1 FROM hive_stats @@ -167,7 +167,7 @@ class HiveStatsCollectionOperator(BaseOperator): """.format(**locals()) mysql.run(sql) - self.logger.info("Pivoting and loading cells into the Airflow db") + self.log.info("Pivoting and loading cells into the Airflow db") rows = [ (self.ds, self.dttm, self.table, part_json) + (r[0][0], r[0][1], r[1]) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_druid.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index d7b1b82..e420dfd 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -92,7 +92,7 @@ class HiveToDruidTransfer(BaseOperator): def execute(self, context): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) - self.logger.info("Extracting data from Hive") + self.log.info("Extracting data from Hive") hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_') sql = self.sql.strip().strip(';') tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()]) @@ -107,7 +107,7 @@ class HiveToDruidTransfer(BaseOperator): AS {sql} """.format(**locals()) - self.logger.info("Running command:\n %s", hql) + self.log.info("Running command:\n %s", hql) hive.run_cli(hql) m = HiveMetastoreHook(self.metastore_conn_id) @@ -131,13 +131,13 @@ class HiveToDruidTransfer(BaseOperator): columns=columns, ) - self.logger.info("Inserting rows into Druid, hdfs path: %s", static_path) + self.log.info("Inserting rows into Druid, hdfs path: %s", static_path) druid.submit_indexing_job(index_spec) - self.logger.info("Load seems to have succeeded!") + self.log.info("Load seems to have succeeded!") finally: - self.logger.info( + self.log.info( "Cleaning up by dropping the temp Hive table %s", hive_table ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_mysql.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py index e82a099..d2d9d0c 100644 --- a/airflow/operators/hive_to_mysql.py +++ b/airflow/operators/hive_to_mysql.py @@ -77,7 +77,7 @@ class HiveToMySqlTransfer(BaseOperator): def execute(self, context): hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id) - self.logger.info("Extracting data from Hive: %s", self.sql) + self.log.info("Extracting data from Hive: %s", self.sql) if self.bulk_load: tmpfile = NamedTemporaryFile() @@ -88,10 +88,10 @@ class HiveToMySqlTransfer(BaseOperator): mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) if self.mysql_preoperator: - self.logger.info("Running MySQL preoperator") + self.log.info("Running MySQL preoperator") mysql.run(self.mysql_preoperator) - self.logger.info("Inserting rows into MySQL") + self.log.info("Inserting rows into MySQL") if self.bulk_load: mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name) @@ -100,7 +100,7 @@ class HiveToMySqlTransfer(BaseOperator): mysql.insert_rows(table=self.mysql_table, rows=results) if self.mysql_postoperator: - self.logger.info("Running MySQL postoperator") + self.log.info("Running MySQL postoperator") mysql.run(self.mysql_postoperator) - self.logger.info("Done.") + self.log.info("Done.") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_samba_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py index d6e6dec..93ebec1 100644 --- a/airflow/operators/hive_to_samba_operator.py +++ b/airflow/operators/hive_to_samba_operator.py @@ -53,7 +53,7 @@ class Hive2SambaOperator(BaseOperator): samba = SambaHook(samba_conn_id=self.samba_conn_id) hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id) tmpfile = tempfile.NamedTemporaryFile() - self.logger.info("Fetching file from Hive") + self.log.info("Fetching file from Hive") hive.to_csv(hql=self.hql, csv_filepath=tmpfile.name) - self.logger.info("Pushing to samba") + self.log.info("Pushing to samba") samba.push_from_local(self.destination_filepath, tmpfile.name) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/http_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py index d92c931..63b892c 100644 --- a/airflow/operators/http_operator.py +++ b/airflow/operators/http_operator.py @@ -74,7 +74,7 @@ class SimpleHttpOperator(BaseOperator): def execute(self, context): http = HttpHook(self.method, http_conn_id=self.http_conn_id) - self.logger.info("Calling HTTP method") + self.log.info("Calling HTTP method") response = http.run(self.endpoint, self.data, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/jdbc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py index 942e312..4ec2fa0 100644 --- a/airflow/operators/jdbc_operator.py +++ b/airflow/operators/jdbc_operator.py @@ -55,6 +55,6 @@ class JdbcOperator(BaseOperator): self.autocommit = autocommit def execute(self, context): - self.logger.info('Executing: %s', self.sql) + self.log.info('Executing: %s', self.sql) self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id) self.hook.run(self.sql, self.autocommit, parameters=self.parameters) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/latest_only_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py index 58f7e67..a1e2a0c 100644 --- a/airflow/operators/latest_only_operator.py +++ b/airflow/operators/latest_only_operator.py @@ -32,29 +32,29 @@ class LatestOnlyOperator(BaseOperator, SkipMixin): # If the DAG Run is externally triggered, then return without # skipping downstream tasks if context['dag_run'] and context['dag_run'].external_trigger: - self.logger.info("Externally triggered DAG_Run: allowing execution to proceed.") + self.log.info("Externally triggered DAG_Run: allowing execution to proceed.") return now = datetime.datetime.now() left_window = context['dag'].following_schedule( context['execution_date']) right_window = context['dag'].following_schedule(left_window) - self.logger.info( + self.log.info( 'Checking latest only with left_window: %s right_window: %s now: %s', left_window, right_window, now ) if not left_window < now <= right_window: - self.logger.info('Not latest execution, skipping downstream.') + self.log.info('Not latest execution, skipping downstream.') downstream_tasks = context['task'].get_flat_relatives(upstream=False) - self.logger.debug("Downstream task_ids %s", downstream_tasks) + self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) - self.logger.info('Done.') + self.log.info('Done.') else: - self.logger.info('Latest, allowing execution to proceed.') + self.log.info('Latest, allowing execution to proceed.') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py index bc0822f..3455232 100644 --- a/airflow/operators/mssql_operator.py +++ b/airflow/operators/mssql_operator.py @@ -44,7 +44,7 @@ class MsSqlOperator(BaseOperator): self.database = database def execute(self, context): - self.logger.info('Executing: %s', self.sql) + self.log.info('Executing: %s', self.sql) hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id, schema=self.database) hook.run(self.sql, autocommit=self.autocommit, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py index 719ddd2..c2c858d 100644 --- a/airflow/operators/mssql_to_hive.py +++ b/airflow/operators/mssql_to_hive.py @@ -102,7 +102,7 @@ class MsSqlToHiveTransfer(BaseOperator): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id) - self.logger.info("Dumping Microsoft SQL Server query results to local file") + self.log.info("Dumping Microsoft SQL Server query results to local file") conn = mssql.get_conn() cursor = conn.cursor() cursor.execute(self.sql) @@ -118,7 +118,7 @@ class MsSqlToHiveTransfer(BaseOperator): f.flush() cursor.close() conn.close() - self.logger.info("Loading file into Hive") + self.log.info("Loading file into Hive") hive.load_file( f.name, self.hive_table, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py index 923eaf8..20f1b7e 100644 --- a/airflow/operators/mysql_operator.py +++ b/airflow/operators/mysql_operator.py @@ -46,7 +46,7 @@ class MySqlOperator(BaseOperator): self.database = database def execute(self, context): - self.logger.info('Executing: %s', self.sql) + self.log.info('Executing: %s', self.sql) hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database) hook.run( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index fde92b5..cd472a8 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -110,7 +110,7 @@ class MySqlToHiveTransfer(BaseOperator): hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) - self.logger.info("Dumping MySQL query results to local file") + self.log.info("Dumping MySQL query results to local file") conn = mysql.get_conn() cursor = conn.cursor() cursor.execute(self.sql) @@ -123,7 +123,7 @@ class MySqlToHiveTransfer(BaseOperator): f.flush() cursor.close() conn.close() - self.logger.info("Loading file into Hive") + self.log.info("Loading file into Hive") hive.load_file( f.name, self.hive_table, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/oracle_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py index f87bbf9..9a35267 100644 --- a/airflow/operators/oracle_operator.py +++ b/airflow/operators/oracle_operator.py @@ -42,7 +42,7 @@ class OracleOperator(BaseOperator): self.parameters = parameters def execute(self, context): - self.logger.info('Executing: %s', self.sql) + self.log.info('Executing: %s', self.sql) hook = OracleHook(oracle_conn_id=self.oracle_conn_id) hook.run( self.sql, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/pig_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py index cdce48a..a4e4e5c 100644 --- a/airflow/operators/pig_operator.py +++ b/airflow/operators/pig_operator.py @@ -59,7 +59,7 @@ class PigOperator(BaseOperator): "(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig) def execute(self, context): - self.logger.info('Executing: %s', self.pig) + self.log.info('Executing: %s', self.pig) self.hook = self.get_hook() self.hook.run_cli(pig=self.pig) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/postgres_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py index 55c1573..c93dc7b 100644 --- a/airflow/operators/postgres_operator.py +++ b/airflow/operators/postgres_operator.py @@ -49,7 +49,7 @@ class PostgresOperator(BaseOperator): self.database = database def execute(self, context): - self.logger.info('Executing: %s', self.sql) + self.log.info('Executing: %s', self.sql) self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database) self.hook.run(self.sql, self.autocommit, parameters=self.parameters) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/presto_to_mysql.py ---------------------------------------------------------------------- diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py index 48158ca..d0c323a 100644 --- a/airflow/operators/presto_to_mysql.py +++ b/airflow/operators/presto_to_mysql.py @@ -61,14 +61,14 @@ class PrestoToMySqlTransfer(BaseOperator): def execute(self, context): presto = PrestoHook(presto_conn_id=self.presto_conn_id) - self.logger.info("Extracting data from Presto: %s", self.sql) + self.log.info("Extracting data from Presto: %s", self.sql) results = presto.get_records(self.sql) mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) if self.mysql_preoperator: - self.logger.info("Running MySQL preoperator") - self.logger.info(self.mysql_preoperator) + self.log.info("Running MySQL preoperator") + self.log.info(self.mysql_preoperator) mysql.run(self.mysql_preoperator) - self.logger.info("Inserting rows into MySQL") + self.log.info("Inserting rows into MySQL") mysql.insert_rows(table=self.mysql_table, rows=results) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 56837ec..718c88f 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -87,7 +87,7 @@ class PythonOperator(BaseOperator): self.op_kwargs = context return_value = self.execute_callable() - self.logger.info("Done. Returned value was: %s", return_value) + self.log.info("Done. Returned value was: %s", return_value) return return_value def execute_callable(self): @@ -115,17 +115,17 @@ class BranchPythonOperator(PythonOperator, SkipMixin): """ def execute(self, context): branch = super(BranchPythonOperator, self).execute(context) - self.logger.info("Following branch %s", branch) - self.logger.info("Marking other directly downstream tasks as skipped") + self.log.info("Following branch %s", branch) + self.log.info("Marking other directly downstream tasks as skipped") downstream_tasks = context['task'].downstream_list - self.logger.debug("Downstream task_ids %s", downstream_tasks) + self.log.debug("Downstream task_ids %s", downstream_tasks) skip_tasks = [t for t in downstream_tasks if t.task_id != branch] if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, skip_tasks) - self.logger.info("Done.") + self.log.info("Done.") class ShortCircuitOperator(PythonOperator, SkipMixin): @@ -142,21 +142,21 @@ class ShortCircuitOperator(PythonOperator, SkipMixin): """ def execute(self, context): condition = super(ShortCircuitOperator, self).execute(context) - self.logger.info("Condition result is %s", condition) + self.log.info("Condition result is %s", condition) if condition: - self.logger.info('Proceeding with downstream tasks...') + self.log.info('Proceeding with downstream tasks...') return - self.logger.info('Skipping downstream tasks...') + self.log.info('Skipping downstream tasks...') downstream_tasks = context['task'].get_flat_relatives(upstream=False) - self.logger.debug("Downstream task_ids %s", downstream_tasks) + self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) - self.logger.info("Done.") + self.log.info("Done.") class PythonVirtualenvOperator(PythonOperator): """ @@ -233,7 +233,7 @@ class PythonVirtualenvOperator(PythonOperator): # generate filenames input_filename = os.path.join(tmp_dir, 'script.in') output_filename = os.path.join(tmp_dir, 'script.out') - string_args_filename = os.path.join(tmp_dir, 'string_args.txt') + string_args_filename = os.path.join(tmp_dir, 'string_args.txt') script_filename = os.path.join(tmp_dir, 'script.py') # set up virtualenv @@ -261,12 +261,12 @@ class PythonVirtualenvOperator(PythonOperator): def _execute_in_subprocess(self, cmd): try: - self.logger.info("Executing cmd\n{}".format(cmd)) + self.log.info("Executing cmd\n{}".format(cmd)) output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) if output: - self.logger.info("Got output\n{}".format(output)) + self.log.info("Got output\n{}".format(output)) except subprocess.CalledProcessError as e: - self.logger.info("Got error output\n{}".format(e.output)) + self.log.info("Got error output\n{}".format(e.output)) raise def _write_string_args(self, filename): @@ -294,14 +294,14 @@ class PythonVirtualenvOperator(PythonOperator): else: return pickle.load(f) except ValueError: - self.logger.error("Error deserializing result. Note that result deserialization " + self.log.error("Error deserializing result. Note that result deserialization " "is not supported across major Python versions.") raise def _write_script(self, script_filename): with open(script_filename, 'w') as f: python_code = self._generate_python_code() - self.logger.debug('Writing code to file\n{}'.format(python_code)) + self.log.debug('Writing code to file\n{}'.format(python_code)) f.write(python_code) def _generate_virtualenv_cmd(self, tmp_dir): @@ -323,7 +323,7 @@ class PythonVirtualenvOperator(PythonOperator): def _generate_python_cmd(self, tmp_dir, script_filename, input_filename, output_filename, string_args_filename): # direct path alleviates need to activate return ['{}/bin/python'.format(tmp_dir), script_filename, input_filename, output_filename, string_args_filename] - + def _generate_python_code(self): if self.use_dill: pickling_library = 'dill' @@ -354,3 +354,5 @@ class PythonVirtualenvOperator(PythonOperator): python_callable_name=fn.__name__, pickling_library=pickling_library) + self.log.info("Done.") + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/redshift_to_s3_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py index e25d613..683ff9c 100644 --- a/airflow/operators/redshift_to_s3_operator.py +++ b/airflow/operators/redshift_to_s3_operator.py @@ -70,7 +70,7 @@ class RedshiftToS3Transfer(BaseOperator): a_key, s_key = self.s3.get_credentials() unload_options = '\n\t\t\t'.join(self.unload_options) - self.logger.info("Retrieving headers from %s.%s...", self.schema, self.table) + self.log.info("Retrieving headers from %s.%s...", self.schema, self.table) columns_query = """SELECT column_name FROM information_schema.columns @@ -99,6 +99,6 @@ class RedshiftToS3Transfer(BaseOperator): """.format(column_names, column_castings, self.schema, self.table, self.s3_bucket, self.s3_key, a_key, s_key, unload_options) - self.logger.info('Executing UNLOAD command...') + self.log.info('Executing UNLOAD command...') self.hook.run(unload_query, self.autocommit) - self.logger.info("UNLOAD command complete...") + self.log.info("UNLOAD command complete...") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_file_transform_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index 5de5127..68c733c 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -74,12 +74,12 @@ class S3FileTransformOperator(BaseOperator): def execute(self, context): source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id) dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id) - self.logger.info("Downloading source S3 file %s", self.source_s3_key) + self.log.info("Downloading source S3 file %s", self.source_s3_key) if not source_s3.check_for_key(self.source_s3_key): raise AirflowException("The source key {0} does not exist".format(self.source_s3_key)) source_s3_key_object = source_s3.get_key(self.source_s3_key) with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest: - self.logger.info( + self.log.info( "Dumping S3 file %s contents to local file %s", self.source_s3_key, f_source.name ) @@ -90,20 +90,20 @@ class S3FileTransformOperator(BaseOperator): [self.transform_script, f_source.name, f_dest.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate() - self.logger.info("Transform script stdout %s", transform_script_stdoutdata) + self.log.info("Transform script stdout %s", transform_script_stdoutdata) if transform_script_process.returncode > 0: raise AirflowException("Transform script failed %s", transform_script_stderrdata) else: - self.logger.info( + self.log.info( "Transform script successful. Output temporarily located at %s", f_dest.name ) - self.logger.info("Uploading transformed file to S3") + self.log.info("Uploading transformed file to S3") f_dest.flush() dest_s3.load_file( filename=f_dest.name, key=self.dest_s3_key, replace=self.replace ) - self.logger.info("Upload successful") + self.log.info("Upload successful") dest_s3.connection.close() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_to_hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 68fe903..2b4aceb 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -129,7 +129,7 @@ class S3ToHiveTransfer(BaseOperator): # Downloading file from S3 self.s3 = S3Hook(s3_conn_id=self.s3_conn_id) self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id) - self.logger.info("Downloading S3 file") + self.log.info("Downloading S3 file") if self.wildcard_match: if not self.s3.check_for_wildcard_key(self.s3_key): @@ -146,13 +146,13 @@ class S3ToHiveTransfer(BaseOperator): NamedTemporaryFile(mode="w", dir=tmp_dir, suffix=file_ext) as f: - self.logger.info("Dumping S3 key {0} contents to local file {1}" - .format(s3_key_object.key, f.name)) + self.log.info("Dumping S3 key {0} contents to local file {1}" + .format(s3_key_object.key, f.name)) s3_key_object.get_contents_to_file(f) f.flush() self.s3.connection.close() if not self.headers: - self.logger.info("Loading file %s into Hive", f.name) + self.log.info("Loading file %s into Hive", f.name) self.hive.load_file( f.name, self.hive_table, @@ -165,11 +165,11 @@ class S3ToHiveTransfer(BaseOperator): else: # Decompressing file if self.input_compressed: - self.logger.info("Uncompressing file %s", f.name) + self.log.info("Uncompressing file %s", f.name) fn_uncompressed = uncompress_file(f.name, file_ext, tmp_dir) - self.logger.info("Uncompressed to %s", fn_uncompressed) + self.log.info("Uncompressed to %s", fn_uncompressed) # uncompressed file available now so deleting # compressed file to save disk space f.close() @@ -178,19 +178,19 @@ class S3ToHiveTransfer(BaseOperator): # Testing if header matches field_dict if self.check_headers: - self.logger.info("Matching file header against field_dict") + self.log.info("Matching file header against field_dict") header_list = self._get_top_row_as_list(fn_uncompressed) if not self._match_headers(header_list): raise AirflowException("Header check failed") # Deleting top header row - self.logger.info("Removing header from file %s", fn_uncompressed) + self.log.info("Removing header from file %s", fn_uncompressed) headless_file = ( self._delete_top_row_and_compress(fn_uncompressed, file_ext, tmp_dir)) - self.logger.info("Headless file %s", headless_file) - self.logger.info("Loading file %s into Hive", headless_file) + self.log.info("Headless file %s", headless_file) + self.log.info("Loading file %s into Hive", headless_file) self.hive.load_file(headless_file, self.hive_table, field_dict=self.field_dict, @@ -211,7 +211,7 @@ class S3ToHiveTransfer(BaseOperator): raise AirflowException("Unable to retrieve header row from file") field_names = self.field_dict.keys() if len(field_names) != len(header_list): - self.logger.warning("Headers count mismatch" + self.log.warning("Headers count mismatch" "File headers:\n {header_list}\n" "Field names: \n {field_names}\n" "".format(**locals())) @@ -219,7 +219,7 @@ class S3ToHiveTransfer(BaseOperator): test_field_match = [h1.lower() == h2.lower() for h1, h2 in zip(header_list, field_names)] if not all(test_field_match): - self.logger.warning("Headers do not match field names" + self.log.warning("Headers do not match field names" "File headers:\n {header_list}\n" "Field names: \n {field_names}\n" "".format(**locals())) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index ea301dc..b5c43c2 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -15,7 +15,7 @@ from __future__ import print_function from future import standard_library -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin standard_library.install_aliases() from builtins import str @@ -82,7 +82,7 @@ class BaseSensorOperator(BaseOperator): else: raise AirflowSensorTimeout('Snap. Time is OUT.') sleep(self.poke_interval) - self.logger.info("Success criteria met. Exiting.") + self.log.info("Success criteria met. Exiting.") class SqlSensor(BaseSensorOperator): @@ -108,7 +108,7 @@ class SqlSensor(BaseSensorOperator): def poke(self, context): hook = BaseHook.get_connection(self.conn_id).get_hook() - self.logger.info('Poking: %s', self.sql) + self.log.info('Poking: %s', self.sql) records = hook.get_records(self.sql) if not records: return False @@ -237,7 +237,7 @@ class ExternalTaskSensor(BaseSensorOperator): serialized_dttm_filter = ','.join( [datetime.isoformat() for datetime in dttm_filter]) - self.logger.info( + self.log.info( 'Poking for ' '{self.external_dag_id}.' '{self.external_task_id} on ' @@ -313,7 +313,7 @@ class NamedHivePartitionSensor(BaseSensorOperator): schema, table, partition = self.parse_partition_name(partition) - self.logger.info( + self.log.info( 'Poking for {schema}.{table}/{partition}'.format(**locals()) ) return self.hook.check_for_named_partition( @@ -371,7 +371,7 @@ class HivePartitionSensor(BaseSensorOperator): def poke(self, context): if '.' in self.table: self.schema, self.table = self.table.split('.') - self.logger.info( + self.log.info( 'Poking for table {self.schema}.{self.table}, ' 'partition {self.partition}'.format(**locals())) if not hasattr(self, 'hook'): @@ -417,7 +417,7 @@ class HdfsSensor(BaseSensorOperator): :return: (bool) depending on the matching criteria """ if size: - log = LoggingMixin().logger + log = LoggingMixin().log log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result)) size *= settings.MEGABYTE result = [x for x in result if x['length'] >= size] @@ -435,7 +435,7 @@ class HdfsSensor(BaseSensorOperator): :return: (list) of dicts which were not removed """ if ignore_copying: - log = LoggingMixin().logger + log = LoggingMixin().log regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext) ignored_extentions_regex = re.compile(regex_builder) log.debug( @@ -448,20 +448,20 @@ class HdfsSensor(BaseSensorOperator): def poke(self, context): sb = self.hook(self.hdfs_conn_id).get_conn() - self.logger.info('Poking for file {self.filepath}'.format(**locals())) + self.log.info('Poking for file {self.filepath}'.format(**locals())) try: # IMOO it's not right here, as there no raise of any kind. # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*', # it's not correct as the directory exists and sb does not raise any error # here is a quick fix result = [f for f in sb.ls([self.filepath], include_toplevel=False)] - self.logger.debug('HdfsSensor.poke: result is %s', result) + self.log.debug('HdfsSensor.poke: result is %s', result) result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying) result = self.filter_for_filesize(result, self.file_size) return bool(result) except: e = sys.exc_info() - self.logger.debug("Caught an exception !: %s", str(e)) + self.log.debug("Caught an exception !: %s", str(e)) return False @@ -484,7 +484,7 @@ class WebHdfsSensor(BaseSensorOperator): def poke(self, context): from airflow.hooks.webhdfs_hook import WebHDFSHook c = WebHDFSHook(self.webhdfs_conn_id) - self.logger.info('Poking for file {self.filepath}'.format(**locals())) + self.log.info('Poking for file {self.filepath}'.format(**locals())) return c.check_for_path(hdfs_path=self.filepath) @@ -535,7 +535,7 @@ class S3KeySensor(BaseSensorOperator): from airflow.hooks.S3_hook import S3Hook hook = S3Hook(s3_conn_id=self.s3_conn_id) full_url = "s3://" + self.bucket_name + "/" + self.bucket_key - self.logger.info('Poking for key : {full_url}'.format(**locals())) + self.log.info('Poking for key : {full_url}'.format(**locals())) if self.wildcard_match: return hook.check_for_wildcard_key(self.bucket_key, self.bucket_name) @@ -577,7 +577,7 @@ class S3PrefixSensor(BaseSensorOperator): self.s3_conn_id = s3_conn_id def poke(self, context): - self.logger.info('Poking for prefix : {self.prefix}\n' + self.log.info('Poking for prefix : {self.prefix}\n' 'in bucket s3://{self.bucket_name}'.format(**locals())) from airflow.hooks.S3_hook import S3Hook hook = S3Hook(s3_conn_id=self.s3_conn_id) @@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator): self.target_time = target_time def poke(self, context): - self.logger.info('Checking if the time (%s) has come', self.target_time) + self.log.info('Checking if the time (%s) has come', self.target_time) return datetime.now().time() > self.target_time @@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator): dag = context['dag'] target_dttm = dag.following_schedule(context['execution_date']) target_dttm += self.delta - self.logger.info('Checking if the time (%s) has come', target_dttm) + self.log.info('Checking if the time (%s) has come', target_dttm) return datetime.now() > target_dttm @@ -679,7 +679,7 @@ class HttpSensor(BaseSensorOperator): http_conn_id=http_conn_id) def poke(self, context): - self.logger.info('Poking: %s', self.endpoint) + self.log.info('Poking: %s', self.endpoint) try: response = self.hook.run(self.endpoint, data=self.request_params, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/slack_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py index 4f2d7bc..8b21211 100644 --- a/airflow/operators/slack_operator.py +++ b/airflow/operators/slack_operator.py @@ -67,7 +67,7 @@ class SlackAPIOperator(BaseOperator): rc = sc.api_call(self.method, **self.api_params) if not rc['ok']: msg = "Slack API call failed (%s)".format(rc['error']) - self.logger.error(msg) + self.log.error(msg) raise AirflowException(msg) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sqlite_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py index 7c85847..b32837d 100644 --- a/airflow/operators/sqlite_operator.py +++ b/airflow/operators/sqlite_operator.py @@ -41,6 +41,6 @@ class SqliteOperator(BaseOperator): self.parameters = parameters or [] def execute(self, context): - self.logger.info('Executing: %s', self.sql) + self.log.info('Executing: %s', self.sql) hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id) hook.run(self.sql, parameters=self.parameters) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/plugins_manager.py ---------------------------------------------------------------------- diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 7c1d246..d5c3407 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -25,9 +25,9 @@ import re import sys from airflow import configuration -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin -log = LoggingMixin().logger +log = LoggingMixin().log class AirflowPluginException(Exception): pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/security/kerberos.py ---------------------------------------------------------------------- diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index a9687b3..7a169b2 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -24,7 +24,7 @@ from airflow import configuration, LoggingMixin NEED_KRB181_WORKAROUND = None -log = LoggingMixin().logger +log = LoggingMixin().log def renew_from_kt(): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index cf1eca4..1e5e614 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -27,9 +27,9 @@ from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import NullPool from airflow import configuration as conf -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin -log = LoggingMixin().logger +log = LoggingMixin().log class DummyStatsLogger(object): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/base_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index 7794f4a..6a07db2 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -19,7 +19,7 @@ import json import subprocess import threading -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin from airflow import configuration as conf from tempfile import mkstemp @@ -39,7 +39,7 @@ class BaseTaskRunner(LoggingMixin): """ # Pass task instance context into log handlers to setup the logger. self._task_instance = local_task_job.task_instance - self.set_logger_contexts(self._task_instance) + self.set_log_contexts(self._task_instance) popen_prepend = [] cfg_path = None @@ -54,7 +54,7 @@ class BaseTaskRunner(LoggingMixin): # Add sudo commands to change user if we need to. Needed to handle SubDagOperator # case using a SequentialExecutor. if self.run_as_user and (self.run_as_user != getpass.getuser()): - self.logger.debug("Planning to run as the %s user", self.run_as_user) + self.log.debug("Planning to run as the %s user", self.run_as_user) cfg_dict = conf.as_dict(display_sensitive=True) cfg_subset = { 'core': cfg_dict.get('core', {}), @@ -95,7 +95,7 @@ class BaseTaskRunner(LoggingMixin): line = line.decode('utf-8') if len(line) == 0: break - self.logger.info('Subtask: %s', line.rstrip('\n')) + self.log.info('Subtask: %s', line.rstrip('\n')) def run_command(self, run_with, join_args=False): """ @@ -112,7 +112,7 @@ class BaseTaskRunner(LoggingMixin): """ cmd = [" ".join(self._command)] if join_args else self._command full_cmd = run_with + cmd - self.logger.info('Running: %s', full_cmd) + self.log.info('Running: %s', full_cmd) proc = subprocess.Popen( full_cmd, stdout=subprocess.PIPE, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/bash_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/task_runner/bash_task_runner.py b/airflow/task_runner/bash_task_runner.py index b73e258..109b44c 100644 --- a/airflow/task_runner/bash_task_runner.py +++ b/airflow/task_runner/bash_task_runner.py @@ -33,7 +33,7 @@ class BashTaskRunner(BaseTaskRunner): def terminate(self): if self.process and psutil.pid_exists(self.process.pid): - kill_process_tree(self.logger, self.process.pid) + kill_process_tree(self.log, self.process.pid) def on_finish(self): super(BashTaskRunner, self).on_finish() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/dag_processing.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 6497fcc..cc64f68 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -27,7 +27,7 @@ from datetime import datetime from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.exceptions import AirflowException -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class SimpleDag(BaseDag): @@ -205,7 +205,7 @@ def list_py_file_paths(directory, safe_mode=True): file_paths.append(file_path) except Exception: - log = LoggingMixin().logger + log = LoggingMixin().log log.exception("Error while examining %s", f) return file_paths @@ -443,7 +443,7 @@ class DagFileProcessorManager(LoggingMixin): if file_path in new_file_paths: filtered_processors[file_path] = processor else: - self.logger.warning("Stopping processor for %s", file_path) + self.log.warning("Stopping processor for %s", file_path) processor.stop() self._processors = filtered_processors @@ -519,7 +519,7 @@ class DagFileProcessorManager(LoggingMixin): os.symlink(log_directory, latest_log_directory_path) elif (os.path.isdir(latest_log_directory_path) or os.path.isfile(latest_log_directory_path)): - self.logger.warning( + self.log.warning( "%s already exists as a dir/file. Skip creating symlink.", latest_log_directory_path ) @@ -558,7 +558,7 @@ class DagFileProcessorManager(LoggingMixin): for file_path, processor in self._processors.items(): if processor.done: - self.logger.info("Processor for %s finished", file_path) + self.log.info("Processor for %s finished", file_path) now = datetime.now() finished_processors[file_path] = processor self._last_runtime[file_path] = (now - @@ -573,7 +573,7 @@ class DagFileProcessorManager(LoggingMixin): simple_dags = [] for file_path, processor in finished_processors.items(): if processor.result is None: - self.logger.warning( + self.log.warning( "Processor for %s exited with return code %s. See %s for details.", processor.file_path, processor.exit_code, processor.log_file ) @@ -606,12 +606,12 @@ class DagFileProcessorManager(LoggingMixin): set(files_paths_at_run_limit)) for file_path, processor in self._processors.items(): - self.logger.debug( + self.log.debug( "File path %s is still being processed (started: %s)", processor.file_path, processor.start_time.isoformat() ) - self.logger.debug( + self.log.debug( "Queuing the following files for processing:\n\t%s", "\n\t".join(files_paths_to_queue) ) @@ -626,7 +626,7 @@ class DagFileProcessorManager(LoggingMixin): processor = self._processor_factory(file_path, log_file_path) processor.start() - self.logger.info( + self.log.info( "Started a process (PID: %s) to generate tasks for %s - logging into %s", processor.pid, file_path, log_file_path ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index c7e58e7..ef2560f 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -25,9 +25,9 @@ from sqlalchemy import event, exc from sqlalchemy.pool import Pool from airflow import settings -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin -log = LoggingMixin().logger +log = LoggingMixin().log def provide_session(func): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/email.py ---------------------------------------------------------------------- diff --git a/airflow/utils/email.py b/airflow/utils/email.py index f252d55..fadd4d5 100644 --- a/airflow/utils/email.py +++ b/airflow/utils/email.py @@ -31,7 +31,7 @@ from email.utils import formatdate from airflow import configuration from airflow.exceptions import AirflowConfigException -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin def send_email(to, subject, html_content, files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed'): @@ -88,7 +88,7 @@ def send_email_smtp(to, subject, html_content, files=None, dryrun=False, cc=None def send_MIME_email(e_from, e_to, mime_msg, dryrun=False): - log = LoggingMixin().logger + log = LoggingMixin().log SMTP_HOST = configuration.get('smtp', 'SMTP_HOST') SMTP_PORT = configuration.getint('smtp', 'SMTP_PORT') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/LoggingMixin.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/LoggingMixin.py b/airflow/utils/log/LoggingMixin.py deleted file mode 100644 index 4572d63..0000000 --- a/airflow/utils/log/LoggingMixin.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -import logging -from builtins import object - - -class LoggingMixin(object): - """ - Convenience super-class to have a logger configured with the class name - """ - - @property - def logger(self): - try: - return self._logger - except AttributeError: - self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__) - return self._logger - - def set_logger_contexts(self, task_instance): - """ - Set the context for all handlers of current logger. - """ - for handler in self.logger.handlers: - try: - handler.set_context(task_instance) - except AttributeError: - pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/gcs_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index 0bc0b5e..dcdaf6d 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -15,7 +15,7 @@ import os from airflow import configuration from airflow.exceptions import AirflowException -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.log.file_task_handler import FileTaskHandler @@ -40,7 +40,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): google_cloud_storage_conn_id=remote_conn_id ) except: - self.logger.error( + self.log.error( 'Could not create a GoogleCloudStorageHook with connection id ' '"%s". Please make sure that airflow[gcp_api] is installed ' 'and the GCS connection exists.', remote_conn_id @@ -137,7 +137,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): # return error if needed if return_error: msg = 'Could not read logs from {}'.format(remote_log_location) - self.logger.error(msg) + self.log.error(msg) return msg def gcs_write(self, log, remote_log_location, append=True): @@ -167,7 +167,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): tmpfile.flush() self.hook.upload(bkt, blob, tmpfile.name) except: - self.logger.error('Could not write logs to %s', remote_log_location) + self.log.error('Could not write logs to %s', remote_log_location) def parse_gcs_url(self, gsurl): """
