[AIRFLOW-1604] Rename logger to log In all the popular languages the variable name log is the de facto standard for the logging. Rename LoggingMixin.py to logging_mixin.py to comply with the Python standard.
When using the .logger a deprecation warning will be emitted. Closes #2604 from Fokko/AIRFLOW-1604-logger-to-log (cherry picked from commit eb2f589099b87743482c2eb16261b49e284dcd96) Signed-off-by: Bolke de Bruin <bo...@xs4all.nl> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/af405084 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/af405084 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/af405084 Branch: refs/heads/v1-9-test Commit: af4050847a5fc354ccc1245f981b8773a5c65075 Parents: 14e6d7b Author: Fokko Driesprong <fokkodriespr...@godatadriven.com> Authored: Tue Sep 19 10:17:14 2017 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Tue Sep 19 10:18:09 2017 +0200 ---------------------------------------------------------------------- airflow/__init__.py | 4 +- airflow/api/__init__.py | 4 +- airflow/api/auth/backend/kerberos_auth.py | 4 +- airflow/bin/cli.py | 10 +- airflow/configuration.py | 4 +- .../auth/backends/github_enterprise_auth.py | 4 +- airflow/contrib/auth/backends/google_auth.py | 4 +- airflow/contrib/auth/backends/kerberos_auth.py | 2 +- airflow/contrib/auth/backends/ldap_auth.py | 4 +- airflow/contrib/auth/backends/password_auth.py | 4 +- airflow/contrib/executors/mesos_executor.py | 34 +-- airflow/contrib/hooks/bigquery_hook.py | 24 +-- airflow/contrib/hooks/cloudant_hook.py | 4 +- airflow/contrib/hooks/databricks_hook.py | 8 +- airflow/contrib/hooks/datadog_hook.py | 6 +- airflow/contrib/hooks/datastore_hook.py | 4 +- airflow/contrib/hooks/ftp_hook.py | 6 +- airflow/contrib/hooks/gcp_api_base_hook.py | 6 +- airflow/contrib/hooks/gcp_dataflow_hook.py | 14 +- airflow/contrib/hooks/gcp_dataproc_hook.py | 18 +- airflow/contrib/hooks/gcp_mlengine_hook.py | 18 +- airflow/contrib/hooks/gcs_hook.py | 4 +- airflow/contrib/hooks/jira_hook.py | 4 +- airflow/contrib/hooks/qubole_hook.py | 12 +- airflow/contrib/hooks/redis_hook.py | 6 +- airflow/contrib/hooks/salesforce_hook.py | 14 +- airflow/contrib/hooks/spark_sql_hook.py | 6 +- airflow/contrib/hooks/spark_submit_hook.py | 14 +- airflow/contrib/hooks/sqoop_hook.py | 10 +- airflow/contrib/hooks/ssh_hook.py | 14 +- airflow/contrib/operators/bigquery_operator.py | 2 +- .../operators/bigquery_table_delete_operator.py | 2 +- .../contrib/operators/bigquery_to_bigquery.py | 2 +- airflow/contrib/operators/bigquery_to_gcs.py | 6 +- .../contrib/operators/databricks_operator.py | 12 +- airflow/contrib/operators/dataproc_operator.py | 16 +- .../operators/datastore_export_operator.py | 2 +- .../operators/datastore_import_operator.py | 2 +- airflow/contrib/operators/ecs_operator.py | 12 +- .../contrib/operators/emr_add_steps_operator.py | 4 +- .../operators/emr_create_job_flow_operator.py | 4 +- .../emr_terminate_job_flow_operator.py | 4 +- airflow/contrib/operators/file_to_wasb.py | 2 +- airflow/contrib/operators/fs_operator.py | 2 +- .../contrib/operators/gcs_download_operator.py | 4 +- airflow/contrib/operators/gcs_to_bq.py | 2 +- airflow/contrib/operators/hipchat_operator.py | 4 +- airflow/contrib/operators/mlengine_operator.py | 14 +- airflow/contrib/operators/mysql_to_gcs.py | 2 +- airflow/contrib/operators/sftp_operator.py | 4 +- airflow/contrib/operators/vertica_operator.py | 2 +- airflow/contrib/operators/vertica_to_hive.py | 4 +- airflow/contrib/sensors/bigquery_sensor.py | 2 +- airflow/contrib/sensors/datadog_sensor.py | 2 +- airflow/contrib/sensors/emr_base_sensor.py | 4 +- airflow/contrib/sensors/emr_job_flow_sensor.py | 2 +- airflow/contrib/sensors/emr_step_sensor.py | 2 +- airflow/contrib/sensors/ftp_sensor.py | 2 +- airflow/contrib/sensors/gcs_sensor.py | 4 +- airflow/contrib/sensors/hdfs_sensors.py | 6 +- airflow/contrib/sensors/jira_sensor.py | 14 +- airflow/contrib/sensors/redis_key_sensor.py | 2 +- airflow/contrib/sensors/wasb_sensor.py | 4 +- .../contrib/task_runner/cgroup_task_runner.py | 20 +- airflow/executors/__init__.py | 4 +- airflow/executors/base_executor.py | 14 +- airflow/executors/celery_executor.py | 16 +- airflow/executors/dask_executor.py | 4 +- airflow/executors/local_executor.py | 6 +- airflow/executors/sequential_executor.py | 4 +- airflow/hooks/S3_hook.py | 10 +- airflow/hooks/base_hook.py | 4 +- airflow/hooks/dbapi_hook.py | 6 +- airflow/hooks/druid_hook.py | 4 +- airflow/hooks/hive_hooks.py | 26 +-- airflow/hooks/http_hook.py | 6 +- airflow/hooks/oracle_hook.py | 8 +- airflow/hooks/pig_hook.py | 4 +- airflow/hooks/webhdfs_hook.py | 12 +- airflow/hooks/zendesk_hook.py | 4 +- airflow/jobs.py | 206 +++++++++---------- airflow/models.py | 126 ++++++------ airflow/operators/bash_operator.py | 14 +- airflow/operators/check_operator.py | 20 +- airflow/operators/dagrun_operator.py | 4 +- airflow/operators/docker_operator.py | 10 +- airflow/operators/generic_transfer.py | 10 +- airflow/operators/hive_operator.py | 2 +- airflow/operators/hive_stats_operator.py | 8 +- airflow/operators/hive_to_druid.py | 10 +- airflow/operators/hive_to_mysql.py | 10 +- airflow/operators/hive_to_samba_operator.py | 4 +- airflow/operators/http_operator.py | 2 +- airflow/operators/jdbc_operator.py | 2 +- airflow/operators/latest_only_operator.py | 12 +- airflow/operators/mssql_operator.py | 2 +- airflow/operators/mssql_to_hive.py | 4 +- airflow/operators/mysql_operator.py | 2 +- airflow/operators/mysql_to_hive.py | 4 +- airflow/operators/oracle_operator.py | 2 +- airflow/operators/pig_operator.py | 2 +- airflow/operators/postgres_operator.py | 2 +- airflow/operators/presto_to_mysql.py | 8 +- airflow/operators/python_operator.py | 36 ++-- airflow/operators/redshift_to_s3_operator.py | 6 +- airflow/operators/s3_file_transform_operator.py | 12 +- airflow/operators/s3_to_hive_operator.py | 24 +-- airflow/operators/sensors.py | 34 +-- airflow/operators/slack_operator.py | 2 +- airflow/operators/sqlite_operator.py | 2 +- airflow/plugins_manager.py | 4 +- airflow/security/kerberos.py | 2 +- airflow/settings.py | 4 +- airflow/task_runner/base_task_runner.py | 10 +- airflow/task_runner/bash_task_runner.py | 2 +- airflow/utils/dag_processing.py | 18 +- airflow/utils/db.py | 4 +- airflow/utils/email.py | 4 +- airflow/utils/log/LoggingMixin.py | 45 ---- airflow/utils/log/gcs_task_handler.py | 8 +- airflow/utils/log/logging_mixin.py | 61 ++++++ airflow/utils/log/s3_task_handler.py | 8 +- airflow/utils/timeout.py | 12 +- airflow/www/api/experimental/endpoints.py | 4 +- airflow/www/app.py | 2 +- scripts/perf/scheduler_ops_metrics.py | 4 +- tests/contrib/hooks/test_databricks_hook.py | 2 +- .../contrib/operators/test_dataproc_operator.py | 8 +- tests/contrib/sensors/test_hdfs_sensors.py | 62 +++--- tests/executors/test_executor.py | 4 +- tests/operators/sensors.py | 4 +- tests/test_utils/reset_warning_registry.py | 82 ++++++++ tests/utils/log/test_logging.py | 6 +- tests/utils/test_logging_mixin.py | 50 +++++ 134 files changed, 858 insertions(+), 708 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/__init__.py b/airflow/__init__.py index 8844eeb..3c5f24c 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -21,7 +21,7 @@ in their PYTHONPATH. airflow_login should be based off the """ from builtins import object from airflow import version -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin __version__ = version.version @@ -41,7 +41,7 @@ login = None def load_login(): - log = LoggingMixin().logger + log = LoggingMixin().log auth_backend = 'airflow.default_login' try: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/api/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py index 39edbed..31a303b 100644 --- a/airflow/api/__init__.py +++ b/airflow/api/__init__.py @@ -17,11 +17,11 @@ from airflow.exceptions import AirflowException from airflow import configuration as conf from importlib import import_module -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin api_auth = None -log = LoggingMixin().logger +log = LoggingMixin().log def load_auth(): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/api/auth/backend/kerberos_auth.py ---------------------------------------------------------------------- diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py index 73a5aa2..a904d59 100644 --- a/airflow/api/auth/backend/kerberos_auth.py +++ b/airflow/api/auth/backend/kerberos_auth.py @@ -24,7 +24,7 @@ from future.standard_library import install_aliases -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin install_aliases() @@ -47,7 +47,7 @@ client_auth = HTTPKerberosAuth(service='airflow') _SERVICE_NAME = None -log = LoggingMixin().logger +log = LoggingMixin().log def init_app(app): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 56f1855..5035a66 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -53,7 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance, from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin from airflow.www.app import cached_app from sqlalchemy import func @@ -64,7 +64,7 @@ api_module = import_module(conf.get('cli', 'api_client')) api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'), auth=api.api_auth.client_auth) -log = LoggingMixin().logger +log = LoggingMixin().log def sigint_handler(sig, frame): @@ -189,7 +189,7 @@ def trigger_dag(args): :param args: :return: """ - log = LoggingMixin().logger + log = LoggingMixin().log try: message = api_client.trigger_dag(dag_id=args.dag_id, run_id=args.run_id, @@ -202,7 +202,7 @@ def trigger_dag(args): def pool(args): - log = LoggingMixin().logger + log = LoggingMixin().log def _tabulate(pools): return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'], @@ -330,7 +330,7 @@ def run(args, dag=None): if dag: args.dag_id = dag.dag_id - log = LoggingMixin().logger + log = LoggingMixin().log # Load custom airflow config if args.cfg_path: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index db196f9..ff81d98 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -28,7 +28,7 @@ import sys from future import standard_library -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin standard_library.install_aliases() @@ -38,7 +38,7 @@ from six.moves import configparser from airflow.exceptions import AirflowConfigException -log = LoggingMixin().logger +log = LoggingMixin().log # show Airflow's deprecation warnings warnings.filterwarnings( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/auth/backends/github_enterprise_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py index 459e9c9..28c3cfc 100644 --- a/airflow/contrib/auth/backends/github_enterprise_auth.py +++ b/airflow/contrib/auth/backends/github_enterprise_auth.py @@ -27,9 +27,9 @@ from flask_oauthlib.client import OAuth from airflow import models, configuration, settings from airflow.configuration import AirflowConfigException -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin -log = LoggingMixin().logger +log = LoggingMixin().log def get_config_param(param): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/auth/backends/google_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py index f38f725..e6eab94 100644 --- a/airflow/contrib/auth/backends/google_auth.py +++ b/airflow/contrib/auth/backends/google_auth.py @@ -26,9 +26,9 @@ from flask import url_for, redirect, request from flask_oauthlib.client import OAuth from airflow import models, configuration, settings -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin -log = LoggingMixin().logger +log = LoggingMixin().log def get_config_param(param): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/auth/backends/kerberos_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py index ffb711f..908ebc9 100644 --- a/airflow/contrib/auth/backends/kerberos_auth.py +++ b/airflow/contrib/auth/backends/kerberos_auth.py @@ -29,7 +29,7 @@ from flask import url_for, redirect from airflow import settings from airflow import models from airflow import configuration -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin login_manager = flask_login.LoginManager() login_manager.login_view = 'airflow.login' # Calls login() below http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/auth/backends/ldap_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py index 8ce0875..b056851 100644 --- a/airflow/contrib/auth/backends/ldap_auth.py +++ b/airflow/contrib/auth/backends/ldap_auth.py @@ -33,13 +33,13 @@ from airflow.configuration import AirflowConfigException import traceback import re -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin login_manager = flask_login.LoginManager() login_manager.login_view = 'airflow.login' # Calls login() below login_manager.login_message = None -log = LoggingMixin().logger +log = LoggingMixin().log class AuthenticationError(Exception): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/auth/backends/password_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py index 3ad2a8b..8adb1f4 100644 --- a/airflow/contrib/auth/backends/password_auth.py +++ b/airflow/contrib/auth/backends/password_auth.py @@ -32,13 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property from airflow import settings from airflow import models -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin login_manager = flask_login.LoginManager() login_manager.login_view = 'airflow.login' # Calls login() below login_manager.login_message = None -log = LoggingMixin().logger +log = LoggingMixin().log PY3 = version_info[0] == 3 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/executors/mesos_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py index 19d72ed..8728566 100644 --- a/airflow/contrib/executors/mesos_executor.py +++ b/airflow/contrib/executors/mesos_executor.py @@ -14,7 +14,7 @@ from future import standard_library -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin from airflow.www.utils import LoginMixin standard_library.install_aliases() @@ -65,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin): self.task_key_map = {} def registered(self, driver, frameworkId, masterInfo): - self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value) + self.log.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value) if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'): # Import here to work around a circular import error @@ -86,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin): Session.remove() def reregistered(self, driver, masterInfo): - self.logger.info("AirflowScheduler re-registered to mesos") + self.log.info("AirflowScheduler re-registered to mesos") def disconnected(self, driver): - self.logger.info("AirflowScheduler disconnected from mesos") + self.log.info("AirflowScheduler disconnected from mesos") def offerRescinded(self, driver, offerId): - self.logger.info("AirflowScheduler offer %s rescinded", str(offerId)) + self.log.info("AirflowScheduler offer %s rescinded", str(offerId)) def frameworkMessage(self, driver, executorId, slaveId, message): - self.logger.info("AirflowScheduler received framework message %s", message) + self.log.info("AirflowScheduler received framework message %s", message) def executorLost(self, driver, executorId, slaveId, status): - self.logger.warning("AirflowScheduler executor %s lost", str(executorId)) + self.log.warning("AirflowScheduler executor %s lost", str(executorId)) def slaveLost(self, driver, slaveId): - self.logger.warning("AirflowScheduler slave %s lost", str(slaveId)) + self.log.warning("AirflowScheduler slave %s lost", str(slaveId)) def error(self, driver, message): - self.logger.error("AirflowScheduler driver aborted %s", message) + self.log.error("AirflowScheduler driver aborted %s", message) raise AirflowException("AirflowScheduler driver aborted %s" % message) def resourceOffers(self, driver, offers): @@ -118,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin): elif resource.name == "mem": offerMem += resource.scalar.value - self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem) + self.log.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem) remainingCpus = offerCpus remainingMem = offerMem @@ -131,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin): self.task_counter += 1 self.task_key_map[str(tid)] = key - self.logger.info("Launching task %d using offer %s", tid, offer.id.value) + self.log.info("Launching task %d using offer %s", tid, offer.id.value) task = mesos_pb2.TaskInfo() task.task_id.value = str(tid) @@ -161,7 +161,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin): driver.launchTasks(offer.id, tasks) def statusUpdate(self, driver, update): - self.logger.info( + self.log.info( "Task %s is in state %s, data %s", update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data) ) @@ -171,7 +171,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin): except KeyError: # The map may not contain an item if the framework re-registered after a failover. # Discard these tasks. - self.logger.warning("Unrecognised task key %s", update.task_id.value) + self.log.warning("Unrecognised task key %s", update.task_id.value) return if update.state == mesos_pb2.TASK_FINISHED: @@ -203,7 +203,7 @@ class MesosExecutor(BaseExecutor, LoginMixin): framework.user = '' if not configuration.get('mesos', 'MASTER'): - self.logger.error("Expecting mesos master URL for mesos executor") + self.log.error("Expecting mesos master URL for mesos executor") raise AirflowException("mesos.master not provided for mesos executor") master = configuration.get('mesos', 'MASTER') @@ -239,7 +239,7 @@ class MesosExecutor(BaseExecutor, LoginMixin): else: framework.checkpoint = False - self.logger.info( + self.log.info( 'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s', master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint) ) @@ -248,10 +248,10 @@ class MesosExecutor(BaseExecutor, LoginMixin): if configuration.getboolean('mesos', 'AUTHENTICATE'): if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'): - self.logger.error("Expecting authentication principal in the environment") + self.log.error("Expecting authentication principal in the environment") raise AirflowException("mesos.default_principal not provided in authenticated mode") if not configuration.get('mesos', 'DEFAULT_SECRET'): - self.logger.error("Expecting authentication secret in the environment") + self.log.error("Expecting authentication secret in the environment") raise AirflowException("mesos.default_secret not provided in authenticated mode") credential = mesos_pb2.Credential() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 497fa28..5fc7e22 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -32,7 +32,7 @@ from past.builtins import basestring from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from airflow.hooks.dbapi_hook import DbApiHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): @@ -499,7 +499,7 @@ class BigQueryBaseCursor(LoggingMixin): "'WRITE_APPEND' or 'WRITE_TRUNCATE'." ) else: - self.logger.info( + self.log.info( "Adding experimental " "'schemaUpdateOptions': {0}".format(schema_update_options) ) @@ -576,12 +576,12 @@ class BigQueryBaseCursor(LoggingMixin): ) ) else: - self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id) + self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id) time.sleep(5) except HttpError as err: if err.resp.status in [500, 503]: - self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id) + self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id) time.sleep(5) else: raise Exception( @@ -660,14 +660,14 @@ class BigQueryBaseCursor(LoggingMixin): datasetId=deletion_dataset, tableId=deletion_table) \ .execute() - self.logger.info('Deleted table %s:%s.%s.', - deletion_project, deletion_dataset, deletion_table) + self.log.info('Deleted table %s:%s.%s.', + deletion_project, deletion_dataset, deletion_table) except HttpError: if not ignore_if_missing: raise Exception( 'Table deletion failed. Table does not exist.') else: - self.logger.info('Table does not exist. Skipping.') + self.log.info('Table does not exist. Skipping.') def run_table_upsert(self, dataset_id, table_resource, project_id=None): @@ -694,7 +694,7 @@ class BigQueryBaseCursor(LoggingMixin): for table in tables_list_resp.get('tables', []): if table['tableReference']['tableId'] == table_id: # found the table, do update - self.logger.info( + self.log.info( 'Table %s:%s.%s exists, updating.', project_id, dataset_id, table_id ) @@ -712,7 +712,7 @@ class BigQueryBaseCursor(LoggingMixin): # If there is no next page, then the table doesn't exist. else: # do insert - self.logger.info( + self.log.info( 'Table %s:%s.%s does not exist. creating.', project_id, dataset_id, table_id ) @@ -759,7 +759,7 @@ class BigQueryBaseCursor(LoggingMixin): 'tableId': view_table}} # check to see if the view we want to add already exists. if view_access not in access: - self.logger.info( + self.log.info( 'Granting table %s:%s.%s authorized view access to %s:%s dataset.', view_project, view_dataset, view_table, source_project, source_dataset ) @@ -769,7 +769,7 @@ class BigQueryBaseCursor(LoggingMixin): body={'access': access}).execute() else: # if view is already in access, do nothing. - self.logger.info( + self.log.info( 'Table %s:%s.%s already has authorized view access to %s:%s dataset.', view_project, view_dataset, view_table, source_project, source_dataset ) @@ -1032,7 +1032,7 @@ def _split_tablename(table_input, default_project_id, var_name=None): if project_id is None: if var_name is not None: - log = LoggingMixin().logger + log = LoggingMixin().log log.info( 'Project not included in {var}: {input}; using project "{project}"'.format( var=var_name, input=table_input, project=default_project_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/cloudant_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py index d9db08d..cbb0cca 100644 --- a/airflow/contrib/hooks/cloudant_hook.py +++ b/airflow/contrib/hooks/cloudant_hook.py @@ -18,7 +18,7 @@ import cloudant from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class CloudantHook(BaseHook): @@ -35,7 +35,7 @@ class CloudantHook(BaseHook): def _str(s): # cloudant-python doesn't support unicode. if isinstance(s, unicode): - log = LoggingMixin().logger + log = LoggingMixin().log log.debug( 'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".', s http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/databricks_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py index 7b20433..cd9dc54 100644 --- a/airflow/contrib/hooks/databricks_hook.py +++ b/airflow/contrib/hooks/databricks_hook.py @@ -20,7 +20,7 @@ from airflow.hooks.base_hook import BaseHook from requests import exceptions as requests_exceptions from requests.auth import AuthBase -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin try: from urllib import parse as urlparse @@ -100,10 +100,10 @@ class DatabricksHook(BaseHook, LoggingMixin): host=self._parse_host(self.databricks_conn.host), endpoint=endpoint) if 'token' in self.databricks_conn.extra_dejson: - self.logger.info('Using token auth.') + self.log.info('Using token auth.') auth = _TokenAuth(self.databricks_conn.extra_dejson['token']) else: - self.logger.info('Using basic auth.') + self.log.info('Using basic auth.') auth = (self.databricks_conn.login, self.databricks_conn.password) if method == 'GET': request_func = requests.get @@ -129,7 +129,7 @@ class DatabricksHook(BaseHook, LoggingMixin): response.content, response.status_code)) except (requests_exceptions.ConnectionError, requests_exceptions.Timeout) as e: - self.logger.error( + self.log.error( 'Attempt %s API Request to Databricks failed with reason: %s', attempt_num, e ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/datadog_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py index 0f5af00..6caf611 100644 --- a/airflow/contrib/hooks/datadog_hook.py +++ b/airflow/contrib/hooks/datadog_hook.py @@ -17,7 +17,7 @@ from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException from datadog import initialize, api -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class DatadogHook(BaseHook, LoggingMixin): @@ -47,7 +47,7 @@ class DatadogHook(BaseHook, LoggingMixin): if self.app_key is None: raise AirflowException("app_key must be specified in the Datadog connection details") - self.logger.info("Setting up api keys for Datadog") + self.log.info("Setting up api keys for Datadog") options = { 'api_key': self.api_key, 'app_key': self.app_key @@ -56,7 +56,7 @@ class DatadogHook(BaseHook, LoggingMixin): def validate_response(self, response): if response['status'] != 'ok': - self.logger.error("Datadog returned: %s", response) + self.log.error("Datadog returned: %s", response) raise AirflowException("Error status received from Datadog") def send_metric(self, metric_name, datapoint, tags=None): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/datastore_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py index 2ff1600..cf98dc7 100644 --- a/airflow/contrib/hooks/datastore_hook.py +++ b/airflow/contrib/hooks/datastore_hook.py @@ -136,8 +136,8 @@ class DatastoreHook(GoogleCloudBaseHook): result = self.get_operation(name) state = result['metadata']['common']['state'] if state == 'PROCESSING': - self.logger.info('Operation is processing. Re-polling state in {} seconds' - .format(polling_interval_in_seconds)) + self.log.info('Operation is processing. Re-polling state in {} seconds' + .format(polling_interval_in_seconds)) time.sleep(polling_interval_in_seconds) else: return result http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/ftp_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py index a6b3181..b1e224d 100644 --- a/airflow/contrib/hooks/ftp_hook.py +++ b/airflow/contrib/hooks/ftp_hook.py @@ -19,7 +19,7 @@ import os.path from airflow.hooks.base_hook import BaseHook from past.builtins import basestring -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin def mlsd(conn, path="", facts=None): @@ -167,9 +167,9 @@ class FTPHook(BaseHook, LoggingMixin): remote_path, remote_file_name = os.path.split(remote_full_path) conn.cwd(remote_path) - self.logger.info('Retrieving file from FTP: %s', remote_full_path) + self.log.info('Retrieving file from FTP: %s', remote_full_path) conn.retrbinary('RETR %s' % remote_file_name, output_handle.write) - self.logger.info('Finished retrieving file from FTP: %s', remote_full_path) + self.log.info('Finished retrieving file from FTP: %s', remote_full_path) if is_path: output_handle.close() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/gcp_api_base_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py index 7476c90..28721d3 100644 --- a/airflow/contrib/hooks/gcp_api_base_hook.py +++ b/airflow/contrib/hooks/gcp_api_base_hook.py @@ -18,7 +18,7 @@ from oauth2client.service_account import ServiceAccountCredentials from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class GoogleCloudBaseHook(BaseHook, LoggingMixin): @@ -66,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin): kwargs['sub'] = self.delegate_to if not key_path: - self.logger.info('Getting connection using `gcloud auth` user, since no key file ' + self.log.info('Getting connection using `gcloud auth` user, since no key file ' 'is defined for hook.') credentials = GoogleCredentials.get_application_default() else: @@ -74,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin): raise AirflowException('Scope should be defined when using a key file.') scopes = [s.strip() for s in scope.split(',')] if key_path.endswith('.json'): - self.logger.info('Getting connection using a JSON key file.') + self.log.info('Getting connection using a JSON key file.') credentials = ServiceAccountCredentials\ .from_json_keyfile_name(key_path, scopes) elif key_path.endswith('.p12'): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py index 457fa37..60ea5b9 100644 --- a/airflow/contrib/hooks/gcp_dataflow_hook.py +++ b/airflow/contrib/hooks/gcp_dataflow_hook.py @@ -19,7 +19,7 @@ import uuid from apiclient.discovery import build from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class _DataflowJob(LoggingMixin): @@ -47,12 +47,12 @@ class _DataflowJob(LoggingMixin): job = self._dataflow.projects().jobs().get(projectId=self._project_number, jobId=self._job_id).execute() if 'currentState' in job: - self.logger.info( + self.log.info( 'Google Cloud DataFlow job %s is %s', job['name'], job['currentState'] ) else: - self.logger.info( + self.log.info( 'Google Cloud DataFlow with job_id %s has name %s', self._job_id, job['name'] ) @@ -74,7 +74,7 @@ class _DataflowJob(LoggingMixin): elif 'JOB_STATE_PENDING' == self._job['currentState']: time.sleep(15) else: - self.logger.debug(str(self._job)) + self.log.debug(str(self._job)) raise Exception( "Google Cloud Dataflow job {} was unknown state: {}".format( self._job['name'], self._job['currentState'])) @@ -108,15 +108,15 @@ class _Dataflow(LoggingMixin): def wait_for_done(self): reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()] - self.logger.info("Start waiting for DataFlow process to complete.") + self.log.info("Start waiting for DataFlow process to complete.") while self._proc.poll() is None: ret = select.select(reads, [], [], 5) if ret is not None: for fd in ret[0]: line = self._line(fd) - self.logger.debug(line[:-1]) + self.log.debug(line[:-1]) else: - self.logger.info("Waiting for DataFlow process to complete.") + self.log.info("Waiting for DataFlow process to complete.") if self._proc.returncode is not 0: raise Exception("DataFlow failed with return code {}".format( self._proc.returncode)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index 3a1336e..c964f4c 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -18,7 +18,7 @@ import uuid from apiclient.discovery import build from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class _DataProcJob(LoggingMixin): @@ -30,7 +30,7 @@ class _DataProcJob(LoggingMixin): region='global', body=job).execute() self.job_id = self.job['reference']['jobId'] - self.logger.info( + self.log.info( 'DataProc job %s is %s', self.job_id, str(self.job['status']['state']) ) @@ -43,20 +43,20 @@ class _DataProcJob(LoggingMixin): jobId=self.job_id).execute() if 'ERROR' == self.job['status']['state']: print(str(self.job)) - self.logger.error('DataProc job %s has errors', self.job_id) - self.logger.error(self.job['status']['details']) - self.logger.debug(str(self.job)) + self.log.error('DataProc job %s has errors', self.job_id) + self.log.error(self.job['status']['details']) + self.log.debug(str(self.job)) return False if 'CANCELLED' == self.job['status']['state']: print(str(self.job)) - self.logger.warning('DataProc job %s is cancelled', self.job_id) + self.log.warning('DataProc job %s is cancelled', self.job_id) if 'details' in self.job['status']: - self.logger.warning(self.job['status']['details']) - self.logger.debug(str(self.job)) + self.log.warning(self.job['status']['details']) + self.log.debug(str(self.job)) return False if 'DONE' == self.job['status']['state']: return True - self.logger.debug( + self.log.debug( 'DataProc job %s is %s', self.job_id, str(self.job['status']['state']) ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/gcp_mlengine_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py index 35f31a7..c17b614 100644 --- a/airflow/contrib/hooks/gcp_mlengine_hook.py +++ b/airflow/contrib/hooks/gcp_mlengine_hook.py @@ -20,11 +20,11 @@ from apiclient.discovery import build from oauth2client.client import GoogleCredentials from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func): - log = LoggingMixin().logger + log = LoggingMixin().log for i in range(0, max_n): try: @@ -103,18 +103,18 @@ class MLEngineHook(GoogleCloudBaseHook): if use_existing_job_fn is not None: existing_job = self._get_job(project_id, job_id) if not use_existing_job_fn(existing_job): - self.logger.error( + self.log.error( 'Job with job_id %s already exist, but it does ' 'not match our expectation: %s', job_id, existing_job ) raise - self.logger.info( + self.log.info( 'Job with job_id %s already exist. Will waiting for it to finish', job_id ) else: - self.logger.error('Failed to create MLEngine job: {}'.format(e)) + self.log.error('Failed to create MLEngine job: {}'.format(e)) raise return self._wait_for_job_done(project_id, job_id) @@ -139,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook): # polling after 30 seconds when quota failure occurs time.sleep(30) else: - self.logger.error('Failed to get MLEngine job: {}'.format(e)) + self.log.error('Failed to get MLEngine job: {}'.format(e)) raise def _wait_for_job_done(self, project_id, job_id, interval=30): @@ -191,10 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook): try: response = request.execute() - self.logger.info('Successfully set version: %s to default', response) + self.log.info('Successfully set version: %s to default', response) return response except errors.HttpError as e: - self.logger.error('Something went wrong: %s', e) + self.log.error('Something went wrong: %s', e) raise def list_versions(self, project_id, model_name): @@ -262,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook): return request.execute() except errors.HttpError as e: if e.resp.status == 404: - self.logger.error('Model was not found: %s', e) + self.log.error('Model was not found: %s', e) return None raise http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index eb17c3b..24c247e 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -182,7 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): ts = ts.replace(tzinfo=dateutil.tz.tzutc()) updated = dateutil.parser.parse(response['updated']) - self.logger.info("Verify object date: %s > %s", updated, ts) + self.log.info("Verify object date: %s > %s", updated, ts) if updated > ts: return True @@ -247,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): ).execute() if 'items' not in response: - self.logger.info("No items found for prefix: %s", prefix) + self.log.info("No items found for prefix: %s", prefix) break for item in response['items']: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/jira_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py index 8702608..21e669f 100644 --- a/airflow/contrib/hooks/jira_hook.py +++ b/airflow/contrib/hooks/jira_hook.py @@ -16,7 +16,7 @@ from jira.exceptions import JIRAError from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class JiraHook(BaseHook, LoggingMixin): @@ -35,7 +35,7 @@ class JiraHook(BaseHook, LoggingMixin): def get_conn(self): if not self.client: - self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id) + self.log.debug('Creating Jira client for conn_id: %s', self.jira_conn_id) get_server_info = True validate = True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/qubole_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py index 1a5e7ec..833c1c7 100755 --- a/airflow/contrib/hooks/qubole_hook.py +++ b/airflow/contrib/hooks/qubole_hook.py @@ -21,7 +21,7 @@ import six from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow import configuration -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State from qds_sdk.qubole import Qubole @@ -86,7 +86,7 @@ class QuboleHook(BaseHook, LoggingMixin): if cmd_id is not None: cmd = Command.find(cmd_id) if cmd is not None: - log = LoggingMixin().logger + log = LoggingMixin().log if cmd.status == 'done': log.info('Command ID: %s has been succeeded, hence marking this ' 'TI as Success.', cmd_id) @@ -99,7 +99,7 @@ class QuboleHook(BaseHook, LoggingMixin): args = self.cls.parse(self.create_cmd_args(context)) self.cmd = self.cls.create(**args) context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id) - self.logger.info( + self.log.info( "Qubole command created with Id: %s and Status: %s", self.cmd.id, self.cmd.status ) @@ -107,10 +107,10 @@ class QuboleHook(BaseHook, LoggingMixin): while not Command.is_done(self.cmd.status): time.sleep(Qubole.poll_interval) self.cmd = self.cls.find(self.cmd.id) - self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status) + self.log.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status) if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True: - self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log()) + self.log.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log()) if self.cmd.status != 'done': raise AirflowException('Command Id: {0} failed with Status: {1}'.format( @@ -126,7 +126,7 @@ class QuboleHook(BaseHook, LoggingMixin): cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id) self.cmd = self.cls.find(cmd_id) if self.cls and self.cmd: - self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id) + self.log.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id) self.cmd.cancel() def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/redis_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py index a8999d6..278e196 100644 --- a/airflow/contrib/hooks/redis_hook.py +++ b/airflow/contrib/hooks/redis_hook.py @@ -19,7 +19,7 @@ from redis import StrictRedis from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class RedisHook(BaseHook, LoggingMixin): @@ -41,7 +41,7 @@ class RedisHook(BaseHook, LoggingMixin): self.password = conn.password self.db = int(conn.extra_dejson.get('db', 0)) - self.logger.debug( + self.log.debug( '''Connection "{conn}": \thost: {host} \tport: {port} @@ -59,7 +59,7 @@ class RedisHook(BaseHook, LoggingMixin): Returns a Redis connection. """ if not self.client: - self.logger.debug( + self.log.debug( 'generating Redis client for conn_id "%s" on %s:%s:%s', self.redis_conn_id, self.host, self.port, self.db ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/salesforce_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py index f2b5fef..0d0a104 100644 --- a/airflow/contrib/hooks/salesforce_hook.py +++ b/airflow/contrib/hooks/salesforce_hook.py @@ -29,7 +29,7 @@ import json import pandas as pd import time -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class SalesforceHook(BaseHook, LoggingMixin): @@ -92,10 +92,10 @@ class SalesforceHook(BaseHook, LoggingMixin): """ self.sign_in() - self.logger.info("Querying for all objects") + self.log.info("Querying for all objects") query = self.sf.query_all(query) - self.logger.info( + self.log.info( "Received results: Total size: %s; Done: %s", query['totalSize'], query['done'] ) @@ -144,7 +144,7 @@ class SalesforceHook(BaseHook, LoggingMixin): field_string = self._build_field_list(fields) query = "SELECT {0} FROM {1}".format(field_string, obj) - self.logger.info( + self.log.info( "Making query to Salesforce: %s", query if len(query) < 30 else " ... ".join([query[:15], query[-15:]]) ) @@ -169,7 +169,7 @@ class SalesforceHook(BaseHook, LoggingMixin): try: col = pd.to_datetime(col) except ValueError: - log = LoggingMixin().logger + log = LoggingMixin().log log.warning( "Could not convert field to timestamps: %s", col.name ) @@ -265,7 +265,7 @@ class SalesforceHook(BaseHook, LoggingMixin): # for each returned record object_name = query_results[0]['attributes']['type'] - self.logger.info("Coercing timestamps for: %s", object_name) + self.log.info("Coercing timestamps for: %s", object_name) schema = self.describe_object(object_name) @@ -299,7 +299,7 @@ class SalesforceHook(BaseHook, LoggingMixin): # there are also a ton of newline objects # that mess up our ability to write to csv # we remove these newlines so that the output is a valid CSV format - self.logger.info("Cleaning data and writing to CSV") + self.log.info("Cleaning data and writing to CSV") possible_strings = df.columns[df.dtypes == "object"] df[possible_strings] = df[possible_strings].apply( lambda x: x.str.replace("\r\n", "") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/spark_sql_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py index aa16130..6973023 100644 --- a/airflow/contrib/hooks/spark_sql_hook.py +++ b/airflow/contrib/hooks/spark_sql_hook.py @@ -16,7 +16,7 @@ import subprocess from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class SparkSqlHook(BaseHook, LoggingMixin): @@ -121,7 +121,7 @@ class SparkSqlHook(BaseHook, LoggingMixin): connection_cmd += ["--queue", self._yarn_queue] connection_cmd += cmd - self.logger.debug("Spark-Sql cmd: %s", connection_cmd) + self.log.debug("Spark-Sql cmd: %s", connection_cmd) return connection_cmd @@ -151,5 +151,5 @@ class SparkSqlHook(BaseHook, LoggingMixin): def kill(self): if self._sp and self._sp.poll() is None: - self.logger.info("Killing the Spark-Sql job") + self.log.info("Killing the Spark-Sql job") self._sp.kill() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/spark_submit_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index bdd1efe..7d59cd2 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -18,7 +18,7 @@ import re from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class SparkSubmitHook(BaseHook, LoggingMixin): @@ -123,7 +123,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): conn_data['spark_home'] = extra.get('spark-home', None) conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit') except AirflowException: - self.logger.debug( + self.log.debug( "Could not load connection string %s, defaulting to %s", self._conn_id, conn_data['master'] ) @@ -192,7 +192,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): if self._application_args: connection_cmd += self._application_args - self.logger.debug("Spark-Submit cmd: %s", connection_cmd) + self.log.debug("Spark-Submit cmd: %s", connection_cmd) return connection_cmd @@ -239,15 +239,15 @@ class SparkSubmitHook(BaseHook, LoggingMixin): self._yarn_application_id = match.groups()[0] # Pass to logging - self.logger.info(line) + self.log.info(line) def on_kill(self): if self._sp and self._sp.poll() is None: - self.logger.info('Sending kill signal to %s', self._connection['spark_binary']) + self.log.info('Sending kill signal to %s', self._connection['spark_binary']) self._sp.kill() if self._yarn_application_id: - self.logger.info('Killing application on YARN') + self.log.info('Killing application on YARN') kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split() yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.logger.info("YARN killed with return code: %s", yarn_kill.wait()) + self.log.info("YARN killed with return code: %s", yarn_kill.wait()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/sqoop_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py index 0584df4..5b00b15 100644 --- a/airflow/contrib/hooks/sqoop_hook.py +++ b/airflow/contrib/hooks/sqoop_hook.py @@ -20,7 +20,7 @@ import subprocess from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class SqoopHook(BaseHook, LoggingMixin): @@ -76,7 +76,7 @@ class SqoopHook(BaseHook, LoggingMixin): password_index = cmd.index('--password') cmd[password_index + 1] = 'MASKED' except ValueError: - self.logger.debug("No password in sqoop cmd") + self.log.debug("No password in sqoop cmd") return cmd def Popen(self, cmd, **kwargs): @@ -87,18 +87,18 @@ class SqoopHook(BaseHook, LoggingMixin): :param kwargs: extra arguments to Popen (see subprocess.Popen) :return: handle to subprocess """ - self.logger.info("Executing command: %s", ' '.join(cmd)) + self.log.info("Executing command: %s", ' '.join(cmd)) sp = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kwargs) for line in iter(sp.stdout): - self.logger.info(line.strip()) + self.log.info(line.strip()) sp.wait() - self.logger.info("Command exited with return code %s", sp.returncode) + self.log.info("Command exited with return code %s", sp.returncode) if sp.returncode: raise AirflowException("Sqoop command failed: %s", ' '.join(cmd)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/hooks/ssh_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py index 3fe9146..b061fd7 100755 --- a/airflow/contrib/hooks/ssh_hook.py +++ b/airflow/contrib/hooks/ssh_hook.py @@ -23,7 +23,7 @@ import paramiko from contextlib import contextmanager from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin class SSHHook(BaseHook, LoggingMixin): @@ -70,7 +70,7 @@ class SSHHook(BaseHook, LoggingMixin): def get_conn(self): if not self.client: - self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id) + self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id) if self.ssh_conn_id is not None: conn = self.get_connection(self.ssh_conn_id) if self.username is None: @@ -98,7 +98,7 @@ class SSHHook(BaseHook, LoggingMixin): # Auto detecting username values from system if not self.username: - self.logger.debug( + self.log.debug( "username to ssh to host: %s is not specified for connection id" " %s. Using system's default provided by getpass.getuser()", self.remote_host, self.ssh_conn_id @@ -142,17 +142,17 @@ class SSHHook(BaseHook, LoggingMixin): self.client = client except paramiko.AuthenticationException as auth_error: - self.logger.error( + self.log.error( "Auth failed while connecting to host: %s, error: %s", self.remote_host, auth_error ) except paramiko.SSHException as ssh_error: - self.logger.error( + self.log.error( "Failed connecting to host: %s, error: %s", self.remote_host, ssh_error ) except Exception as error: - self.logger.error( + self.log.error( "Error connecting to host: %s, error: %s", self.remote_host, error ) @@ -191,7 +191,7 @@ class SSHHook(BaseHook, LoggingMixin): ] ssh_cmd += ssh_tunnel_cmd - self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd) + self.log.debug("Creating tunnel with cmd: %s", ssh_cmd) proc = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 37e4a97..a2ba824 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -87,7 +87,7 @@ class BigQueryOperator(BaseOperator): self.query_params = query_params def execute(self, context): - self.logger.info('Executing: %s', self.bql) + self.log.info('Executing: %s', self.bql) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/bigquery_table_delete_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py index 21de7cc..0f4ef50 100644 --- a/airflow/contrib/operators/bigquery_table_delete_operator.py +++ b/airflow/contrib/operators/bigquery_table_delete_operator.py @@ -53,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator): self.ignore_if_missing = ignore_if_missing def execute(self, context): - self.logger.info('Deleting: %s', self.deletion_dataset_table) + self.log.info('Deleting: %s', self.deletion_dataset_table) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/bigquery_to_bigquery.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py index 8e21270..2bc4a8b 100644 --- a/airflow/contrib/operators/bigquery_to_bigquery.py +++ b/airflow/contrib/operators/bigquery_to_bigquery.py @@ -68,7 +68,7 @@ class BigQueryToBigQueryOperator(BaseOperator): self.delegate_to = delegate_to def execute(self, context): - self.logger.info( + self.log.info( 'Executing copy of %s into: %s', self.source_project_dataset_tables, self.destination_project_dataset_table ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/bigquery_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py index 23a2029..800e7bd 100644 --- a/airflow/contrib/operators/bigquery_to_gcs.py +++ b/airflow/contrib/operators/bigquery_to_gcs.py @@ -79,9 +79,9 @@ class BigQueryToCloudStorageOperator(BaseOperator): self.delegate_to = delegate_to def execute(self, context): - self.logger.info('Executing extract of %s into: %s', - self.source_project_dataset_table, - self.destination_cloud_storage_uris) + self.log.info('Executing extract of %s into: %s', + self.source_project_dataset_table, + self.destination_cloud_storage_uris) hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to) conn = hook.get_conn() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/databricks_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py index 8773357..cffc4ff 100644 --- a/airflow/contrib/operators/databricks_operator.py +++ b/airflow/contrib/operators/databricks_operator.py @@ -214,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator): raise AirflowException(msg) def _log_run_page_url(self, url): - self.logger.info('View run status, Spark UI, and logs at %s', url) + self.log.info('View run status, Spark UI, and logs at %s', url) def get_hook(self): return DatabricksHook( @@ -225,13 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator): hook = self.get_hook() self.run_id = hook.submit_run(self.json) run_page_url = hook.get_run_page_url(self.run_id) - self.logger.info('Run submitted with run_id: %s', self.run_id) + self.log.info('Run submitted with run_id: %s', self.run_id) self._log_run_page_url(run_page_url) while True: run_state = hook.get_run_state(self.run_id) if run_state.is_terminal: if run_state.is_successful: - self.logger.info('%s completed successfully.', self.task_id) + self.log.info('%s completed successfully.', self.task_id) self._log_run_page_url(run_page_url) return else: @@ -240,15 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator): s=run_state) raise AirflowException(error_message) else: - self.logger.info('%s in run state: %s', self.task_id, run_state) + self.log.info('%s in run state: %s', self.task_id, run_state) self._log_run_page_url(run_page_url) - self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds) + self.log.info('Sleeping for %s seconds.', self.polling_period_seconds) time.sleep(self.polling_period_seconds) def on_kill(self): hook = self.get_hook() hook.cancel_run(self.run_id) - self.logger.info( + self.log.info( 'Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 3c22b60..bdb0335 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -177,12 +177,12 @@ class DataprocClusterCreateOperator(BaseOperator): while True: state = self._get_cluster_state(service) if state is None: - self.logger.info("No state for cluster '%s'", self.cluster_name) + self.log.info("No state for cluster '%s'", self.cluster_name) time.sleep(15) else: - self.logger.info("State for cluster '%s' is %s", self.cluster_name, state) + self.log.info("State for cluster '%s' is %s", self.cluster_name, state) if self._cluster_ready(state, service): - self.logger.info( + self.log.info( "Cluster '%s' successfully created", self.cluster_name ) return @@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator): return cluster_data def execute(self, context): - self.logger.info('Creating cluster: %s', self.cluster_name) + self.log.info('Creating cluster: %s', self.cluster_name) hook = DataProcHook( gcp_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to @@ -272,7 +272,7 @@ class DataprocClusterCreateOperator(BaseOperator): service = hook.get_conn() if self._get_cluster(service): - self.logger.info( + self.log.info( 'Cluster %s already exists... Checking status...', self.cluster_name ) @@ -290,7 +290,7 @@ class DataprocClusterCreateOperator(BaseOperator): # probably two cluster start commands at the same time time.sleep(10) if self._get_cluster(service): - self.logger.info( + self.log.info( 'Cluster {} already exists... Checking status...', self.cluster_name ) @@ -358,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator): time.sleep(15) def execute(self, context): - self.logger.info('Deleting cluster: %s', self.cluster_name) + self.log.info('Deleting cluster: %s', self.cluster_name) hook = DataProcHook( gcp_conn_id=self.google_cloud_conn_id, delegate_to=self.delegate_to @@ -371,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator): clusterName=self.cluster_name ).execute() operation_name = response['name'] - self.logger.info("Cluster delete operation name: %s", operation_name) + self.log.info("Cluster delete operation name: %s", operation_name) self._wait_for_done(service, operation_name) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/datastore_export_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py index 76415e1..51e1d06 100644 --- a/airflow/contrib/operators/datastore_export_operator.py +++ b/airflow/contrib/operators/datastore_export_operator.py @@ -78,7 +78,7 @@ class DatastoreExportOperator(BaseOperator): self.xcom_push = xcom_push def execute(self, context): - self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket) + self.log.info('Exporting data to Cloud Storage bucket ' + self.bucket) if self.overwrite_existing and self.namespace: gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/datastore_import_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py index 74bd940..d8c42e7 100644 --- a/airflow/contrib/operators/datastore_import_operator.py +++ b/airflow/contrib/operators/datastore_import_operator.py @@ -72,7 +72,7 @@ class DatastoreImportOperator(BaseOperator): self.xcom_push = xcom_push def execute(self, context): - self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket) + self.log.info('Importing data from Cloud Storage bucket %s', self.bucket) ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to) result = ds_hook.import_from_storage_bucket(bucket=self.bucket, file=self.file, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/ecs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py index 0c75eaa..898a77a 100644 --- a/airflow/contrib/operators/ecs_operator.py +++ b/airflow/contrib/operators/ecs_operator.py @@ -56,11 +56,11 @@ class ECSOperator(BaseOperator): self.hook = self.get_hook() def execute(self, context): - self.logger.info( + self.log.info( 'Running ECS Task - Task definition: %s - on cluster %s', self.task_definition,self.cluster ) - self.logger.info('ECSOperator overrides: %s', self.overrides) + self.log.info('ECSOperator overrides: %s', self.overrides) self.client = self.hook.get_client_type( 'ecs', @@ -77,13 +77,13 @@ class ECSOperator(BaseOperator): failures = response['failures'] if len(failures) > 0: raise AirflowException(response) - self.logger.info('ECS Task started: %s', response) + self.log.info('ECS Task started: %s', response) self.arn = response['tasks'][0]['taskArn'] self._wait_for_task_ended() self._check_success_task() - self.logger.info('ECS Task has been successfully executed: %s', response) + self.log.info('ECS Task has been successfully executed: %s', response) def _wait_for_task_ended(self): waiter = self.client.get_waiter('tasks_stopped') @@ -98,7 +98,7 @@ class ECSOperator(BaseOperator): cluster=self.cluster, tasks=[self.arn] ) - self.logger.info('ECS Task stopped, check status: %s', response) + self.log.info('ECS Task stopped, check status: %s', response) if len(response.get('failures', [])) > 0: raise AirflowException(response) @@ -124,4 +124,4 @@ class ECSOperator(BaseOperator): cluster=self.cluster, task=self.arn, reason='Task killed by the user') - self.logger.info(response) + self.log.info(response) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/emr_add_steps_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py index dbf764e..227474e 100644 --- a/airflow/contrib/operators/emr_add_steps_operator.py +++ b/airflow/contrib/operators/emr_add_steps_operator.py @@ -48,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator): def execute(self, context): emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn() - self.logger.info('Adding steps to %s', self.job_flow_id) + self.log.info('Adding steps to %s', self.job_flow_id) response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: raise AirflowException('Adding steps failed: %s' % response) else: - self.logger.info('Steps %s added to JobFlow', response['StepIds']) + self.log.info('Steps %s added to JobFlow', response['StepIds']) return response['StepIds'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/emr_create_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py index 4e40b17..2544adf 100644 --- a/airflow/contrib/operators/emr_create_job_flow_operator.py +++ b/airflow/contrib/operators/emr_create_job_flow_operator.py @@ -50,7 +50,7 @@ class EmrCreateJobFlowOperator(BaseOperator): def execute(self, context): emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id) - self.logger.info( + self.log.info( 'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s', self.aws_conn_id, self.emr_conn_id ) @@ -59,5 +59,5 @@ class EmrCreateJobFlowOperator(BaseOperator): if not response['ResponseMetadata']['HTTPStatusCode'] == 200: raise AirflowException('JobFlow creation failed: %s' % response) else: - self.logger.info('JobFlow with id %s created', response['JobFlowId']) + self.log.info('JobFlow with id %s created', response['JobFlowId']) return response['JobFlowId'] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/emr_terminate_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py index df641ad..ec29897 100644 --- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py +++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py @@ -43,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator): def execute(self, context): emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn() - self.logger.info('Terminating JobFlow %s', self.job_flow_id) + self.log.info('Terminating JobFlow %s', self.job_flow_id) response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id]) if not response['ResponseMetadata']['HTTPStatusCode'] == 200: raise AirflowException('JobFlow termination failed: %s' % response) else: - self.logger.info('JobFlow with id %s terminated', self.job_flow_id) + self.log.info('JobFlow with id %s terminated', self.job_flow_id) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/file_to_wasb.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py index 4519e1e..3478dd3 100644 --- a/airflow/contrib/operators/file_to_wasb.py +++ b/airflow/contrib/operators/file_to_wasb.py @@ -51,7 +51,7 @@ class FileToWasbOperator(BaseOperator): def execute(self, context): """Upload a file to Azure Blob Storage.""" hook = WasbHook(wasb_conn_id=self.wasb_conn_id) - self.logger.info( + self.log.info( 'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals()) ) hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/fs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py index ca7d716..e7640c8 100644 --- a/airflow/contrib/operators/fs_operator.py +++ b/airflow/contrib/operators/fs_operator.py @@ -48,7 +48,7 @@ class FileSensor(BaseSensorOperator): hook = FSHook(self.fs_conn_id) basepath = hook.get_path() full_path = "/".join([basepath, self.filepath]) - self.logger.info('Poking for file {full_path}'.format(**locals())) + self.log.info('Poking for file {full_path}'.format(**locals())) try: files = [f for f in walk(full_path)] except: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/gcs_download_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py index 27e85b7..53516b1 100644 --- a/airflow/contrib/operators/gcs_download_operator.py +++ b/airflow/contrib/operators/gcs_download_operator.py @@ -65,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): self.delegate_to = delegate_to def execute(self, context): - self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename) + self.log.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename) hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) file_bytes = hook.download(self.bucket, self.object, self.filename) @@ -74,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator): context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes) else: raise RuntimeError('The size of the downloaded file is too large to push to XCom!') - self.logger.info(file_bytes) + self.log.info(file_bytes) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 01f53cc..730a3bc 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -189,7 +189,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): self.destination_project_dataset_table)) row = cursor.fetchone() max_id = row[0] if row[0] else 0 - self.logger.info( + self.log.info( 'Loaded BQ data with max %s.%s=%s', self.destination_project_dataset_table, self.max_id_key, max_id ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/hipchat_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py index 19c6d76..d82ad61 100644 --- a/airflow/contrib/operators/hipchat_operator.py +++ b/airflow/contrib/operators/hipchat_operator.py @@ -66,8 +66,8 @@ class HipChatAPIOperator(BaseOperator): 'Authorization': 'Bearer %s' % self.token}, data=self.body) if response.status_code >= 400: - self.logger.error('HipChat API call failed: %s %s', - response.status_code, response.reason) + self.log.error('HipChat API call failed: %s %s', + response.status_code, response.reason) raise AirflowException('HipChat API call failed: %s %s' % (response.status_code, response.reason)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/mlengine_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py index fdbfede..4d8943b 100644 --- a/airflow/contrib/operators/mlengine_operator.py +++ b/airflow/contrib/operators/mlengine_operator.py @@ -22,9 +22,9 @@ from airflow.operators import BaseOperator from airflow.utils.decorators import apply_defaults from apiclient import errors -from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.utils.log.logging_mixin import LoggingMixin -log = LoggingMixin().logger +log = LoggingMixin().log def _create_prediction_input(project_id, @@ -225,7 +225,7 @@ class MLEngineBatchPredictionOperator(BaseOperator): model_name, version_name, uri, max_worker_count, runtime_version) except ValueError as e: - self.logger.error( + self.log.error( 'Cannot create batch prediction job request due to: %s', e ) @@ -251,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator): raise if finished_prediction_job['state'] != 'SUCCEEDED': - self.logger.error( + self.log.error( 'Batch prediction job failed: %s', str(finished_prediction_job)) raise RuntimeError(finished_prediction_job['errorMessage']) @@ -538,8 +538,8 @@ class MLEngineTrainingOperator(BaseOperator): } if self._mode == 'DRY_RUN': - self.logger.info('In dry_run mode.') - self.logger.info('MLEngine Training job request is: {}'.format(training_request)) + self.log.info('In dry_run mode.') + self.log.info('MLEngine Training job request is: {}'.format(training_request)) return hook = MLEngineHook( @@ -557,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator): raise if finished_training_job['state'] != 'SUCCEEDED': - self.logger.error('MLEngine training job failed: {}'.format( + self.log.error('MLEngine training job failed: {}'.format( str(finished_training_job))) raise RuntimeError(finished_training_job['errorMessage']) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af405084/airflow/contrib/operators/mysql_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index f7b3a5a..c8ebcd0 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -168,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): 'mode': field_mode, }) - self.logger.info('Using schema for %s: %s', self.schema_filename, schema) + self.log.info('Using schema for %s: %s', self.schema_filename, schema) tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True) json.dump(schema, tmp_schema_file_handle) return {self.schema_filename: tmp_schema_file_handle}