[AIRFLOW-1582] Improve logging within Airflow Clean the way of logging within Airflow. Remove the old logging.py and move to the airflow.utils.log.* interface. Remove setting the logging outside of the settings/configuration code. Move away from the string format to logging_function(msg, *args).
Closes #2592 from Fokko/AIRFLOW-1582-Improve- logging-structure Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a7a51890 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a7a51890 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a7a51890 Branch: refs/heads/master Commit: a7a518902dcf1e7fd4bf477cf57cee691f181b29 Parents: 5de632e Author: Fokko Driesprong <[email protected]> Authored: Wed Sep 13 09:36:58 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Wed Sep 13 09:36:58 2017 +0200 ---------------------------------------------------------------------- UPDATING.md | 4 +- airflow/__init__.py | 10 +- airflow/api/__init__.py | 12 +- airflow/api/auth/backend/kerberos_auth.py | 14 +- airflow/api/common/experimental/get_task.py | 4 - .../common/experimental/get_task_instance.py | 4 - airflow/bin/airflow | 1 - airflow/bin/cli.py | 46 +-- airflow/configuration.py | 25 +- .../auth/backends/github_enterprise_auth.py | 5 +- airflow/contrib/auth/backends/google_auth.py | 11 +- airflow/contrib/auth/backends/kerberos_auth.py | 5 +- airflow/contrib/auth/backends/ldap_auth.py | 32 +- airflow/contrib/auth/backends/password_auth.py | 10 +- airflow/contrib/executors/mesos_executor.py | 51 +-- airflow/contrib/hooks/bigquery_hook.py | 57 +-- airflow/contrib/hooks/cloudant_hook.py | 10 +- airflow/contrib/hooks/databricks_hook.py | 15 +- airflow/contrib/hooks/datadog_hook.py | 13 +- airflow/contrib/hooks/datastore_hook.py | 3 +- airflow/contrib/hooks/ftp_hook.py | 10 +- airflow/contrib/hooks/gcp_api_base_hook.py | 13 +- airflow/contrib/hooks/gcp_dataflow_hook.py | 29 +- airflow/contrib/hooks/gcp_dataproc_hook.py | 28 +- airflow/contrib/hooks/gcp_mlengine_hook.py | 48 ++- airflow/contrib/hooks/gcs_hook.py | 10 +- airflow/contrib/hooks/jira_hook.py | 9 +- airflow/contrib/hooks/qubole_hook.py | 22 +- airflow/contrib/hooks/redis_hook.py | 16 +- airflow/contrib/hooks/salesforce_hook.py | 31 +- airflow/contrib/hooks/spark_sql_hook.py | 10 +- airflow/contrib/hooks/spark_submit_hook.py | 24 +- airflow/contrib/hooks/sqoop_hook.py | 17 +- airflow/contrib/hooks/ssh_hook.py | 34 +- airflow/contrib/operators/bigquery_operator.py | 4 +- .../operators/bigquery_table_delete_operator.py | 4 +- .../contrib/operators/bigquery_to_bigquery.py | 8 +- airflow/contrib/operators/bigquery_to_gcs.py | 4 +- .../contrib/operators/databricks_operator.py | 25 +- airflow/contrib/operators/dataproc_operator.py | 30 +- .../operators/datastore_export_operator.py | 5 +- .../operators/datastore_import_operator.py | 6 +- airflow/contrib/operators/ecs_operator.py | 24 +- .../contrib/operators/emr_add_steps_operator.py | 7 +- .../operators/emr_create_job_flow_operator.py | 10 +- .../emr_terminate_job_flow_operator.py | 7 +- airflow/contrib/operators/file_to_wasb.py | 14 +- airflow/contrib/operators/fs_operator.py | 4 +- .../contrib/operators/gcs_download_operator.py | 6 +- airflow/contrib/operators/gcs_to_bq.py | 8 +- airflow/contrib/operators/hipchat_operator.py | 3 +- airflow/contrib/operators/mlengine_operator.py | 30 +- .../operators/mlengine_prediction_summary.py | 2 - airflow/contrib/operators/mysql_to_gcs.py | 4 +- airflow/contrib/operators/sftp_operator.py | 7 +- .../contrib/operators/spark_submit_operator.py | 6 +- airflow/contrib/operators/ssh_operator.py | 1 - airflow/contrib/operators/vertica_operator.py | 5 +- airflow/contrib/operators/vertica_to_hive.py | 5 +- airflow/contrib/sensors/bigquery_sensor.py | 5 +- airflow/contrib/sensors/datadog_sensor.py | 5 +- airflow/contrib/sensors/emr_base_sensor.py | 9 +- airflow/contrib/sensors/emr_job_flow_sensor.py | 6 +- airflow/contrib/sensors/emr_step_sensor.py | 5 +- airflow/contrib/sensors/ftp_sensor.py | 4 +- airflow/contrib/sensors/gcs_sensor.py | 7 +- airflow/contrib/sensors/hdfs_sensors.py | 12 +- airflow/contrib/sensors/jira_sensor.py | 27 +- airflow/contrib/sensors/redis_key_sensor.py | 4 - airflow/contrib/sensors/wasb_sensor.py | 11 +- .../contrib/task_runner/cgroup_task_runner.py | 49 +-- airflow/executors/__init__.py | 8 +- airflow/executors/base_executor.py | 18 +- airflow/executors/celery_executor.py | 24 +- airflow/executors/dask_executor.py | 10 +- airflow/executors/local_executor.py | 11 +- airflow/executors/sequential_executor.py | 4 +- airflow/hooks/S3_hook.py | 52 +-- airflow/hooks/base_hook.py | 9 +- airflow/hooks/dbapi_hook.py | 33 +- airflow/hooks/druid_hook.py | 10 +- airflow/hooks/hive_hooks.py | 37 +- airflow/hooks/http_hook.py | 7 +- airflow/hooks/oracle_hook.py | 9 +- airflow/hooks/pig_hook.py | 6 +- airflow/hooks/presto_hook.py | 4 - airflow/hooks/webhdfs_hook.py | 17 +- airflow/hooks/zendesk_hook.py | 20 +- airflow/jobs.py | 349 ++++++++++--------- airflow/models.py | 211 +++++------ airflow/operators/bash_operator.py | 23 +- airflow/operators/check_operator.py | 24 +- airflow/operators/dagrun_operator.py | 6 +- airflow/operators/docker_operator.py | 12 +- airflow/operators/generic_transfer.py | 13 +- airflow/operators/hive_operator.py | 4 +- airflow/operators/hive_stats_operator.py | 10 +- airflow/operators/hive_to_druid.py | 18 +- airflow/operators/hive_to_mysql.py | 14 +- airflow/operators/hive_to_samba_operator.py | 6 +- airflow/operators/http_operator.py | 8 +- airflow/operators/jdbc_operator.py | 7 +- airflow/operators/latest_only_operator.py | 19 +- airflow/operators/mssql_operator.py | 5 +- airflow/operators/mssql_to_hive.py | 6 +- airflow/operators/mysql_operator.py | 5 +- airflow/operators/mysql_to_hive.py | 5 +- airflow/operators/oracle_operator.py | 5 +- airflow/operators/pig_operator.py | 4 +- airflow/operators/postgres_operator.py | 5 +- airflow/operators/presto_to_mysql.py | 12 +- airflow/operators/python_operator.py | 24 +- airflow/operators/redshift_to_s3_operator.py | 17 +- airflow/operators/s3_file_transform_operator.py | 30 +- airflow/operators/s3_to_hive_operator.py | 39 +-- airflow/operators/sensors.py | 49 +-- airflow/operators/slack_operator.py | 9 +- airflow/operators/sqlite_operator.py | 5 +- airflow/plugins_manager.py | 11 +- airflow/security/kerberos.py | 25 +- airflow/settings.py | 17 +- airflow/task_runner/base_task_runner.py | 9 +- airflow/utils/dag_processing.py | 55 +-- airflow/utils/db.py | 11 +- airflow/utils/email.py | 8 +- airflow/utils/log/LoggingMixin.py | 45 +++ airflow/utils/log/file_task_handler.py | 34 +- airflow/utils/log/gcs_task_handler.py | 125 ++++++- airflow/utils/log/s3_task_handler.py | 97 +++++- airflow/utils/logging.py | 252 ------------- airflow/utils/timeout.py | 17 +- airflow/www/api/experimental/endpoints.py | 6 +- airflow/www/app.py | 10 +- airflow/www/views.py | 9 +- setup.py | 11 - tests/contrib/hooks/test_databricks_hook.py | 15 +- .../contrib/operators/test_dataproc_operator.py | 59 ++-- tests/core.py | 16 +- tests/operators/sensors.py | 53 +-- tests/utils/log/test_logging.py | 108 ++++++ tests/utils/test_logging.py | 103 ------ 141 files changed, 1578 insertions(+), 1747 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/UPDATING.md ---------------------------------------------------------------------- diff --git a/UPDATING.md b/UPDATING.md index 92ee4b4..cde7141 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -13,7 +13,9 @@ assists people when migrating to a new version. - No updates are required if you are using ftpHook, it will continue work as is. ### Logging update - Logs now are stored in the log folder as ``{dag_id}/{task_id}/{execution_date}/{try_number}.log``. +Airflow's logging has been rewritten to uses Pythonâs builtin `logging` module to perform system logging. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. The main benefit that this brings to us is the easy configuration of the logging through `default_airflow_logging.py` and the ability to use different handlers for logging. + +Logs now are stored in the log folder as `{dag_id}/{task_id}/{execution_date}/{try_number}.log`. ### New Features http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/__init__.py b/airflow/__init__.py index 3daa6e2..8844eeb 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -21,9 +21,10 @@ in their PYTHONPATH. airflow_login should be based off the """ from builtins import object from airflow import version +from airflow.utils.log.LoggingMixin import LoggingMixin + __version__ = version.version -import logging import sys from airflow import configuration as conf @@ -40,13 +41,15 @@ login = None def load_login(): + log = LoggingMixin().logger + auth_backend = 'airflow.default_login' try: if conf.getboolean('webserver', 'AUTHENTICATE'): auth_backend = conf.get('webserver', 'auth_backend') except conf.AirflowConfigException: if conf.getboolean('webserver', 'AUTHENTICATE'): - logging.warning( + log.warning( "auth_backend not found in webserver config reverting to " "*deprecated* behavior of importing airflow_login") auth_backend = "airflow_login" @@ -55,7 +58,7 @@ def load_login(): global login login = import_module(auth_backend) except ImportError as err: - logging.critical( + log.critical( "Cannot import authentication module %s. " "Please correct your authentication backend or disable authentication: %s", auth_backend, err @@ -76,7 +79,6 @@ from airflow import operators from airflow import hooks from airflow import executors from airflow import macros -from airflow import contrib operators._integrate_plugins() hooks._integrate_plugins() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py index ae47abf..39edbed 100644 --- a/airflow/api/__init__.py +++ b/airflow/api/__init__.py @@ -13,14 +13,16 @@ # limitations under the License. from __future__ import print_function -import logging - from airflow.exceptions import AirflowException from airflow import configuration as conf from importlib import import_module +from airflow.utils.log.LoggingMixin import LoggingMixin + api_auth = None +log = LoggingMixin().logger + def load_auth(): auth_backend = 'airflow.api.auth.backend.default' @@ -33,6 +35,8 @@ def load_auth(): global api_auth api_auth = import_module(auth_backend) except ImportError as err: - logging.critical("Cannot import {} for API authentication due to: {}" - .format(auth_backend, err)) + log.critical( + "Cannot import %s for API authentication due to: %s", + auth_backend, err + ) raise AirflowException(err) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/auth/backend/kerberos_auth.py ---------------------------------------------------------------------- diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py index d1c3b70..73a5aa2 100644 --- a/airflow/api/auth/backend/kerberos_auth.py +++ b/airflow/api/auth/backend/kerberos_auth.py @@ -23,10 +23,12 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from future.standard_library import install_aliases + +from airflow.utils.log.LoggingMixin import LoggingMixin + install_aliases() import kerberos -import logging import os from airflow import configuration as conf @@ -45,6 +47,8 @@ client_auth = HTTPKerberosAuth(service='airflow') _SERVICE_NAME = None +log = LoggingMixin().logger + def init_app(app): global _SERVICE_NAME @@ -52,7 +56,7 @@ def init_app(app): hostname = app.config.get('SERVER_NAME') if not hostname: hostname = getfqdn() - logging.info("Kerberos: hostname {}".format(hostname)) + log.info("Kerberos: hostname %s", hostname) service = 'airflow' @@ -62,12 +66,12 @@ def init_app(app): os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab') try: - logging.info("Kerberos init: {} {}".format(service, hostname)) + log.info("Kerberos init: %s %s", service, hostname) principal = kerberos.getServerPrincipalDetails(service, hostname) except kerberos.KrbError as err: - logging.warning("Kerberos: {}".format(err)) + log.warning("Kerberos: %s", err) else: - logging.info("Kerberos API: server is {}".format(principal)) + log.info("Kerberos API: server is %s", principal) def _unauthorized(): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/common/experimental/get_task.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py index 39ab423..9023ad1 100644 --- a/airflow/api/common/experimental/get_task.py +++ b/airflow/api/common/experimental/get_task.py @@ -12,13 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.exceptions import AirflowException from airflow.models import DagBag -_log = logging.getLogger(__name__) - def get_task(dag_id, task_id): """Return the task object identified by the given dag_id and task_id.""" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/common/experimental/get_task_instance.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py index 4c50731..7ab5e6e 100644 --- a/airflow/api/common/experimental/get_task_instance.py +++ b/airflow/api/common/experimental/get_task_instance.py @@ -12,13 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging - from airflow.exceptions import AirflowException from airflow.models import DagBag -_log = logging.getLogger(__name__) - def get_task_instance(dag_id, task_id, execution_date): """Return the task object identified by the given dag_id and task_id.""" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/bin/airflow ---------------------------------------------------------------------- diff --git a/airflow/bin/airflow b/airflow/bin/airflow index 0598596..2c0024d 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging import os from airflow import configuration from airflow.bin.cli import CLIFactory http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index a0545c3..56f1855 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -53,6 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance, from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils +from airflow.utils.log.LoggingMixin import LoggingMixin from airflow.www.app import cached_app from sqlalchemy import func @@ -63,6 +64,8 @@ api_module = import_module(conf.get('cli', 'api_client')) api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'), auth=api.api_auth.client_auth) +log = LoggingMixin().logger + def sigint_handler(sig, frame): sys.exit(0) @@ -186,19 +189,21 @@ def trigger_dag(args): :param args: :return: """ + log = LoggingMixin().logger try: message = api_client.trigger_dag(dag_id=args.dag_id, run_id=args.run_id, conf=args.conf, execution_date=args.exec_date) except IOError as err: - logging.error(err) + log.error(err) raise AirflowException(err) - - logging.info(message) + log.info(message) def pool(args): + log = LoggingMixin().logger + def _tabulate(pools): return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'], tablefmt="fancy_grid") @@ -215,9 +220,9 @@ def pool(args): else: pools = api_client.get_pools() except (AirflowException, IOError) as err: - logging.error(err) + log.error(err) else: - logging.info(_tabulate(pools=pools)) + log.info(_tabulate(pools=pools)) def variables(args): @@ -325,6 +330,8 @@ def run(args, dag=None): if dag: args.dag_id = dag.dag_id + log = LoggingMixin().logger + # Load custom airflow config if args.cfg_path: with open(args.cfg_path, 'r') as conf_file: @@ -343,7 +350,7 @@ def run(args, dag=None): dag = get_dag(args) elif not dag: session = settings.Session() - logging.info('Loading pickle id {args.pickle}'.format(args=args)) + log.info('Loading pickle id {args.pickle}'.format(args=args)) dag_pickle = session.query( DagPickle).filter(DagPickle.id == args.pickle).first() if not dag_pickle: @@ -354,11 +361,11 @@ def run(args, dag=None): ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() - logger = logging.getLogger('airflow.task') + log = logging.getLogger('airflow.task') if args.raw: - logger = logging.getLogger('airflow.task.raw') + log = logging.getLogger('airflow.task.raw') - for handler in logger.handlers: + for handler in log.handlers: try: handler.set_context(ti) except AttributeError: @@ -367,7 +374,7 @@ def run(args, dag=None): pass hostname = socket.getfqdn() - logging.info("Running on host {}".format(hostname)) + log.info("Running on host %s", hostname) if args.local: run_job = jobs.LocalTaskJob( @@ -396,6 +403,7 @@ def run(args, dag=None): session.add(pickle) session.commit() pickle_id = pickle.id + # TODO: This should be written to a log print(( 'Pickled dag {dag} ' 'as pickle_id:{pickle_id}').format(**locals())) @@ -427,7 +435,7 @@ def run(args, dag=None): # might subsequently read from the log to insert into S3 or # Google cloud storage. Explicitly close the handler is # needed in order to upload to remote storage services. - for handler in logger.handlers: + for handler in log.handlers: handler.flush() handler.close() @@ -449,6 +457,7 @@ def task_failed_deps(args): dep_context = DepContext(deps=SCHEDULER_DEPS) failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) + # TODO, Do we want to print or log this if failed_deps: print("Task instance dependencies not met:") for dep in failed_deps: @@ -605,8 +614,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): def start_refresh(gunicorn_master_proc): batch_size = conf.getint('webserver', 'worker_refresh_batch_size') - logging.debug('%s doing a refresh of %s workers', - state, batch_size) + log.debug('%s doing a refresh of %s workers', state, batch_size) sys.stdout.flush() sys.stderr.flush() @@ -628,14 +636,14 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): # Whenever some workers are not ready, wait until all workers are ready if num_ready_workers_running < num_workers_running: - logging.debug('%s some workers are starting up, waiting...', state) + log.debug('%s some workers are starting up, waiting...', state) sys.stdout.flush() time.sleep(1) # Kill a worker gracefully by asking gunicorn to reduce number of workers elif num_workers_running > num_workers_expected: excess = num_workers_running - num_workers_expected - logging.debug('%s killing %s workers', state, excess) + log.debug('%s killing %s workers', state, excess) for _ in range(excess): gunicorn_master_proc.send_signal(signal.SIGTTOU) @@ -646,7 +654,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): # Start a new worker by asking gunicorn to increase number of workers elif num_workers_running == num_workers_expected: refresh_interval = conf.getint('webserver', 'worker_refresh_interval') - logging.debug( + log.debug( '%s sleeping for %ss starting doing a refresh...', state, refresh_interval ) @@ -655,7 +663,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): else: # num_ready_workers_running == num_workers_running < num_workers_expected - logging.error(( + log.error(( "%s some workers seem to have died and gunicorn" "did not restart them as expected" ), state) @@ -770,7 +778,7 @@ def webserver(args): gunicorn_master_proc_pid = int(f.read()) break except IOError: - logging.debug("Waiting for gunicorn's pid file to be created.") + log.debug("Waiting for gunicorn's pid file to be created.") time.sleep(0.1) gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) @@ -896,8 +904,6 @@ def resetdb(args): if args.yes or input( "This will drop existing tables if they exist. " "Proceed? (y/n)").upper() == "Y": - logging.basicConfig(level=settings.LOGGING_LEVEL, - format=settings.SIMPLE_LOG_FORMAT) db_utils.resetdb() else: print("Bail.") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index 460d975..db196f9 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -19,7 +19,6 @@ from __future__ import unicode_literals import copy import errno -import logging import os import six import subprocess @@ -28,6 +27,9 @@ import shlex import sys from future import standard_library + +from airflow.utils.log.LoggingMixin import LoggingMixin + standard_library.install_aliases() from builtins import str @@ -36,6 +38,8 @@ from six.moves import configparser from airflow.exceptions import AirflowConfigException +log = LoggingMixin().logger + # show Airflow's deprecation warnings warnings.filterwarnings( action='default', category=DeprecationWarning, module='airflow') @@ -198,8 +202,9 @@ class AirflowConfigParser(ConfigParser): return option else: - logging.warning("section/key [{section}/{key}] not found " - "in config".format(**locals())) + log.warning( + "section/key [{section}/{key}] not found in config".format(**locals()) + ) raise AirflowConfigException( "section/key [{section}/{key}] not found " @@ -366,20 +371,22 @@ else: TEMPLATE_START = ( '# ----------------------- TEMPLATE BEGINS HERE -----------------------') if not os.path.isfile(TEST_CONFIG_FILE): - logging.info( - 'Creating new Airflow config file for unit tests in: {}'.format( - TEST_CONFIG_FILE)) + log.info( + 'Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE + ) with open(TEST_CONFIG_FILE, 'w') as f: cfg = parameterized_config(TEST_CONFIG) f.write(cfg.split(TEMPLATE_START)[-1].strip()) if not os.path.isfile(AIRFLOW_CONFIG): - logging.info('Creating new Airflow config file in: {}'.format( - AIRFLOW_CONFIG)) + log.info( + 'Creating new Airflow config file in: %s', + AIRFLOW_CONFIG + ) with open(AIRFLOW_CONFIG, 'w') as f: cfg = parameterized_config(DEFAULT_CONFIG) f.write(cfg.split(TEMPLATE_START)[-1].strip()) -logging.info("Reading the config from " + AIRFLOW_CONFIG) +log.info("Reading the config from %s", AIRFLOW_CONFIG) conf = AirflowConfigParser() conf.read(AIRFLOW_CONFIG) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/github_enterprise_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py index 91126c7..459e9c9 100644 --- a/airflow/contrib/auth/backends/github_enterprise_auth.py +++ b/airflow/contrib/auth/backends/github_enterprise_auth.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging - import flask_login # Need to expose these downstream @@ -29,8 +27,9 @@ from flask_oauthlib.client import OAuth from airflow import models, configuration, settings from airflow.configuration import AirflowConfigException +from airflow.utils.log.LoggingMixin import LoggingMixin -_log = logging.getLogger(__name__) +log = LoggingMixin().logger def get_config_param(param): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/google_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py index 70c8e13..f38f725 100644 --- a/airflow/contrib/auth/backends/google_auth.py +++ b/airflow/contrib/auth/backends/google_auth.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging - import flask_login # Need to expose these downstream @@ -28,9 +26,9 @@ from flask import url_for, redirect, request from flask_oauthlib.client import OAuth from airflow import models, configuration, settings -from airflow.configuration import AirflowConfigException +from airflow.utils.log.LoggingMixin import LoggingMixin -_log = logging.getLogger(__name__) +log = LoggingMixin().logger def get_config_param(param): @@ -106,7 +104,7 @@ class GoogleAuthBackend(object): self.oauth_callback) def login(self, request): - _log.debug('Redirecting user to Google login') + log.debug('Redirecting user to Google login') return self.google_oauth.authorize(callback=url_for( 'google_oauth_callback', _external=True, @@ -142,7 +140,7 @@ class GoogleAuthBackend(object): return GoogleUser(user) def oauth_callback(self): - _log.debug('Google OAuth callback called') + log.debug('Google OAuth callback called') next_url = request.args.get('next') or url_for('admin.index') @@ -162,7 +160,6 @@ class GoogleAuthBackend(object): return redirect(url_for('airflow.noaccess')) except AuthenticationError: - _log.exception('') return redirect(url_for('airflow.noaccess')) session = settings.Session() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/kerberos_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py index e381059..ffb711f 100644 --- a/airflow/contrib/auth/backends/kerberos_auth.py +++ b/airflow/contrib/auth/backends/kerberos_auth.py @@ -29,8 +29,7 @@ from flask import url_for, redirect from airflow import settings from airflow import models from airflow import configuration - -import logging +from airflow.utils.log.LoggingMixin import LoggingMixin login_manager = flask_login.LoginManager() login_manager.login_view = 'airflow.login' # Calls login() below @@ -41,7 +40,7 @@ class AuthenticationError(Exception): pass -class KerberosUser(models.User): +class KerberosUser(models.User, LoggingMixin): def __init__(self, user): self.user = user http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/ldap_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py index 341f710..8ce0875 100644 --- a/airflow/contrib/auth/backends/ldap_auth.py +++ b/airflow/contrib/auth/backends/ldap_auth.py @@ -30,16 +30,16 @@ from airflow import models from airflow import configuration from airflow.configuration import AirflowConfigException -import logging - import traceback import re +from airflow.utils.log.LoggingMixin import LoggingMixin + login_manager = flask_login.LoginManager() login_manager.login_view = 'airflow.login' # Calls login() below login_manager.login_message = None -LOG = logging.getLogger(__name__) +log = LoggingMixin().logger class AuthenticationError(Exception): @@ -64,7 +64,7 @@ def get_ldap_connection(dn=None, password=None): conn = Connection(server, native(dn), native(password)) if not conn.bind(): - LOG.error("Cannot bind to ldap server: %s ", conn.last_error) + log.error("Cannot bind to ldap server: %s ", conn.last_error) raise AuthenticationError("Cannot bind to ldap server") return conn @@ -74,7 +74,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam search_filter = '(&({0}))'.format(group_filter) if not conn.search(native(search_base), native(search_filter), attributes=[native(user_name_attr)]): - LOG.warning("Unable to find group for %s %s", search_base, search_filter) + log.warning("Unable to find group for %s %s", search_base, search_filter) else: for resp in conn.response: if ( @@ -95,11 +95,11 @@ def groups_user(conn, search_base, user_filter, user_name_att, username): memberof_attr = "memberOf" res = conn.search(native(search_base), native(search_filter), attributes=[native(memberof_attr)]) if not res: - LOG.info("Cannot find user %s", username) + log.info("Cannot find user %s", username) raise AuthenticationError("Invalid username or password") if conn.response and memberof_attr not in conn.response[0]["attributes"]: - LOG.warning("""Missing attribute "%s" when looked-up in Ldap database. + log.warning("""Missing attribute "%s" when looked-up in Ldap database. The user does not seem to be a member of a group and therefore won't see any dag if the option filter_by_owner=True and owner_mode=ldapgroup are set""", memberof_attr) return [] @@ -111,7 +111,7 @@ def groups_user(conn, search_base, user_filter, user_name_att, username): try: groups_list = [regex.search(i).group(1) for i in user_groups] except IndexError: - LOG.warning("Parsing error when retrieving the user's group(s)." + log.warning("Parsing error when retrieving the user's group(s)." " Check if the user belongs to at least one group" " or if the user's groups name do not contain special characters") @@ -134,7 +134,7 @@ class LdapUser(models.User): user.username) except AirflowConfigException: self.superuser = True - LOG.debug("Missing configuration for superuser settings. Skipping.") + log.debug("Missing configuration for superuser settings. Skipping.") try: self.data_profiler = group_contains_user(conn, @@ -144,7 +144,7 @@ class LdapUser(models.User): user.username) except AirflowConfigException: self.data_profiler = True - LOG.debug("Missing configuration for dataprofiler settings. Skipping") + log.debug("Missing configuration for dataprofiler settings. Skipping") # Load the ldap group(s) a user belongs to try: @@ -154,7 +154,7 @@ class LdapUser(models.User): configuration.get("ldap", "user_name_attr"), user.username) except AirflowConfigException: - LOG.debug("Missing configuration for ldap settings. Skipping") + log.debug("Missing configuration for ldap settings. Skipping") @staticmethod def try_login(username, password): @@ -185,7 +185,7 @@ class LdapUser(models.User): # todo: use list or result? if not res: - LOG.info("Cannot find user %s", username) + log.info("Cannot find user %s", username) raise AuthenticationError("Invalid username or password") entry = conn.response[0] @@ -200,14 +200,14 @@ class LdapUser(models.User): try: conn = get_ldap_connection(entry['dn'], password) except KeyError as e: - LOG.error(""" + log.error(""" Unable to parse LDAP structure. If you're using Active Directory and not specifying an OU, you must set search_scope=SUBTREE in airflow.cfg. %s """ % traceback.format_exc()) raise LdapException("Could not parse LDAP structure. Try setting search_scope in airflow.cfg, or check logs") if not conn: - LOG.info("Password incorrect for user %s", username) + log.info("Password incorrect for user %s", username) raise AuthenticationError("Invalid username or password") def is_active(self): @@ -237,7 +237,7 @@ class LdapUser(models.User): @login_manager.user_loader def load_user(userid): - LOG.debug("Loading user %s", userid) + log.debug("Loading user %s", userid) if not userid or userid == 'None': return None @@ -270,7 +270,7 @@ def login(self, request): try: LdapUser.try_login(username, password) - LOG.info("User %s successfully authenticated", username) + log.info("User %s successfully authenticated", username) session = settings.Session() user = session.query(models.User).filter( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/password_auth.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py index 000b986..3ad2a8b 100644 --- a/airflow/contrib/auth/backends/password_auth.py +++ b/airflow/contrib/auth/backends/password_auth.py @@ -32,15 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property from airflow import settings from airflow import models -from airflow import configuration - -import logging +from airflow.utils.log.LoggingMixin import LoggingMixin login_manager = flask_login.LoginManager() login_manager.login_view = 'airflow.login' # Calls login() below login_manager.login_message = None -LOG = logging.getLogger(__name__) +log = LoggingMixin().logger PY3 = version_info[0] == 3 @@ -94,7 +92,7 @@ class PasswordUser(models.User): @login_manager.user_loader def load_user(userid): - LOG.debug("Loading user %s", userid) + log.debug("Loading user %s", userid) if not userid or userid == 'None': return None @@ -137,7 +135,7 @@ def login(self, request): if not user.authenticate(password): session.close() raise AuthenticationError() - LOG.info("User %s successfully authenticated", username) + log.info("User %s successfully authenticated", username) flask_login.login_user(user) session.commit() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/executors/mesos_executor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py index 49788fc..19d72ed 100644 --- a/airflow/contrib/executors/mesos_executor.py +++ b/airflow/contrib/executors/mesos_executor.py @@ -13,9 +13,12 @@ # limitations under the License. from future import standard_library + +from airflow.utils.log.LoggingMixin import LoggingMixin +from airflow.www.utils import LoginMixin + standard_library.install_aliases() from builtins import str -import logging from queue import Queue import mesos.interface @@ -41,7 +44,7 @@ def get_framework_name(): # AirflowMesosScheduler, implements Mesos Scheduler interface # To schedule airflow jobs on mesos -class AirflowMesosScheduler(mesos.interface.Scheduler): +class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin): """ Airflow Mesos scheduler implements mesos scheduler interface to schedule airflow tasks on mesos. @@ -49,7 +52,6 @@ class AirflowMesosScheduler(mesos.interface.Scheduler): 'airflow run <dag_id> <task_instance_id> <start_date> --local -p=<pickle>' to run on a mesos slave. """ - def __init__(self, task_queue, result_queue, @@ -63,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler): self.task_key_map = {} def registered(self, driver, frameworkId, masterInfo): - logging.info("AirflowScheduler registered to mesos with framework ID %s", frameworkId.value) + self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value) if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'): # Import here to work around a circular import error @@ -84,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler): Session.remove() def reregistered(self, driver, masterInfo): - logging.info("AirflowScheduler re-registered to mesos") + self.logger.info("AirflowScheduler re-registered to mesos") def disconnected(self, driver): - logging.info("AirflowScheduler disconnected from mesos") + self.logger.info("AirflowScheduler disconnected from mesos") def offerRescinded(self, driver, offerId): - logging.info("AirflowScheduler offer %s rescinded", str(offerId)) + self.logger.info("AirflowScheduler offer %s rescinded", str(offerId)) def frameworkMessage(self, driver, executorId, slaveId, message): - logging.info("AirflowScheduler received framework message %s", message) + self.logger.info("AirflowScheduler received framework message %s", message) def executorLost(self, driver, executorId, slaveId, status): - logging.warning("AirflowScheduler executor %s lost", str(executorId)) + self.logger.warning("AirflowScheduler executor %s lost", str(executorId)) def slaveLost(self, driver, slaveId): - logging.warning("AirflowScheduler slave %s lost", str(slaveId)) + self.logger.warning("AirflowScheduler slave %s lost", str(slaveId)) def error(self, driver, message): - logging.error("AirflowScheduler driver aborted %s", message) + self.logger.error("AirflowScheduler driver aborted %s", message) raise AirflowException("AirflowScheduler driver aborted %s" % message) def resourceOffers(self, driver, offers): @@ -116,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler): elif resource.name == "mem": offerMem += resource.scalar.value - logging.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem) + self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem) remainingCpus = offerCpus remainingMem = offerMem @@ -129,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler): self.task_counter += 1 self.task_key_map[str(tid)] = key - logging.info("Launching task %d using offer %s", tid, offer.id.value) + self.logger.info("Launching task %d using offer %s", tid, offer.id.value) task = mesos_pb2.TaskInfo() task.task_id.value = str(tid) @@ -159,15 +161,17 @@ class AirflowMesosScheduler(mesos.interface.Scheduler): driver.launchTasks(offer.id, tasks) def statusUpdate(self, driver, update): - logging.info("Task %s is in state %s, data %s", - update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data)) + self.logger.info( + "Task %s is in state %s, data %s", + update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data) + ) try: key = self.task_key_map[update.task_id.value] except KeyError: # The map may not contain an item if the framework re-registered after a failover. # Discard these tasks. - logging.warning("Unrecognised task key %s" % update.task_id.value) + self.logger.warning("Unrecognised task key %s", update.task_id.value) return if update.state == mesos_pb2.TASK_FINISHED: @@ -181,7 +185,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler): self.task_queue.task_done() -class MesosExecutor(BaseExecutor): +class MesosExecutor(BaseExecutor, LoginMixin): """ MesosExecutor allows distributing the execution of task instances to multiple mesos workers. @@ -192,7 +196,6 @@ class MesosExecutor(BaseExecutor): elastic distributed systems to easily be built and run effectively. See http://mesos.apache.org/ """ - def start(self): self.task_queue = Queue() self.result_queue = Queue() @@ -200,7 +203,7 @@ class MesosExecutor(BaseExecutor): framework.user = '' if not configuration.get('mesos', 'MASTER'): - logging.error("Expecting mesos master URL for mesos executor") + self.logger.error("Expecting mesos master URL for mesos executor") raise AirflowException("mesos.master not provided for mesos executor") master = configuration.get('mesos', 'MASTER') @@ -236,17 +239,19 @@ class MesosExecutor(BaseExecutor): else: framework.checkpoint = False - logging.info('MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s', - master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint)) + self.logger.info( + 'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s', + master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint) + ) implicit_acknowledgements = 1 if configuration.getboolean('mesos', 'AUTHENTICATE'): if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'): - logging.error("Expecting authentication principal in the environment") + self.logger.error("Expecting authentication principal in the environment") raise AirflowException("mesos.default_principal not provided in authenticated mode") if not configuration.get('mesos', 'DEFAULT_SECRET'): - logging.error("Expecting authentication secret in the environment") + self.logger.error("Expecting authentication secret in the environment") raise AirflowException("mesos.default_secret not provided in authenticated mode") credential = mesos_pb2.Credential() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index d2ce2b0..497fa28 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -18,7 +18,6 @@ This module contains a BigQuery Hook, as well as a very basic PEP 249 implementation for BigQuery. """ -import logging import time from apiclient.discovery import build, HttpError @@ -33,11 +32,10 @@ from past.builtins import basestring from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from airflow.hooks.dbapi_hook import DbApiHook +from airflow.utils.log.LoggingMixin import LoggingMixin -logging.getLogger("bigquery").setLevel(logging.INFO) - -class BigQueryHook(GoogleCloudBaseHook, DbApiHook): +class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): """ Interact with BigQuery. This hook uses the Google Cloud Platform connection. @@ -178,13 +176,12 @@ class BigQueryConnection(object): "BigQueryConnection does not have transactions") -class BigQueryBaseCursor(object): +class BigQueryBaseCursor(LoggingMixin): """ The BigQuery base cursor contains helper methods to execute queries against BigQuery. The methods can be used directly by operators, in cases where a PEP 249 cursor isn't needed. """ - def __init__(self, service, project_id): self.service = service self.project_id = project_id @@ -290,10 +287,12 @@ class BigQueryBaseCursor(object): :param print_header: Whether to print a header for a CSV file extract. :type print_header: boolean """ + source_project, source_dataset, source_table = \ _split_tablename(table_input=source_project_dataset_table, default_project_id=self.project_id, var_name='source_project_dataset_table') + configuration = { 'extract': { 'sourceTable': { @@ -500,7 +499,7 @@ class BigQueryBaseCursor(object): "'WRITE_APPEND' or 'WRITE_TRUNCATE'." ) else: - logging.info( + self.logger.info( "Adding experimental " "'schemaUpdateOptions': {0}".format(schema_update_options) ) @@ -577,12 +576,12 @@ class BigQueryBaseCursor(object): ) ) else: - logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id) + self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id) time.sleep(5) except HttpError as err: if err.resp.status in [500, 503]: - logging.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id) + self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id) time.sleep(5) else: raise Exception( @@ -661,14 +660,14 @@ class BigQueryBaseCursor(object): datasetId=deletion_dataset, tableId=deletion_table) \ .execute() - logging.info('Deleted table %s:%s.%s.', + self.logger.info('Deleted table %s:%s.%s.', deletion_project, deletion_dataset, deletion_table) except HttpError: if not ignore_if_missing: raise Exception( 'Table deletion failed. Table does not exist.') else: - logging.info('Table does not exist. Skipping.') + self.logger.info('Table does not exist. Skipping.') def run_table_upsert(self, dataset_id, table_resource, project_id=None): @@ -695,8 +694,10 @@ class BigQueryBaseCursor(object): for table in tables_list_resp.get('tables', []): if table['tableReference']['tableId'] == table_id: # found the table, do update - logging.info('table %s:%s.%s exists, updating.', - project_id, dataset_id, table_id) + self.logger.info( + 'Table %s:%s.%s exists, updating.', + project_id, dataset_id, table_id + ) return self.service.tables().update(projectId=project_id, datasetId=dataset_id, tableId=table_id, @@ -711,8 +712,10 @@ class BigQueryBaseCursor(object): # If there is no next page, then the table doesn't exist. else: # do insert - logging.info('table %s:%s.%s does not exist. creating.', - project_id, dataset_id, table_id) + self.logger.info( + 'Table %s:%s.%s does not exist. creating.', + project_id, dataset_id, table_id + ) return self.service.tables().insert(projectId=project_id, datasetId=dataset_id, body=table_resource).execute() @@ -756,18 +759,20 @@ class BigQueryBaseCursor(object): 'tableId': view_table}} # check to see if the view we want to add already exists. if view_access not in access: - logging.info('granting table %s:%s.%s authorized view access to %s:%s dataset.', - view_project, view_dataset, view_table, - source_project, source_dataset) + self.logger.info( + 'Granting table %s:%s.%s authorized view access to %s:%s dataset.', + view_project, view_dataset, view_table, source_project, source_dataset + ) access.append(view_access) return self.service.datasets().patch(projectId=source_project, datasetId=source_dataset, body={'access': access}).execute() else: # if view is already in access, do nothing. - logging.info('table %s:%s.%s already has authorized view access to %s:%s dataset.', - view_project, view_dataset, view_table, - source_project, source_dataset) + self.logger.info( + 'Table %s:%s.%s already has authorized view access to %s:%s dataset.', + view_project, view_dataset, view_table, source_project, source_dataset + ) return source_dataset_resource @@ -1027,10 +1032,12 @@ def _split_tablename(table_input, default_project_id, var_name=None): if project_id is None: if var_name is not None: - logging.info( - 'project not included in {var}: ' - '{input}; using project "{project}"'.format( - var=var_name, input=table_input, project=default_project_id)) + log = LoggingMixin().logger + log.info( + 'Project not included in {var}: {input}; using project "{project}"'.format( + var=var_name, input=table_input, project=default_project_id + ) + ) project_id = default_project_id return project_id, dataset_id, table_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/cloudant_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py index 6cea26f..d9db08d 100644 --- a/airflow/contrib/hooks/cloudant_hook.py +++ b/airflow/contrib/hooks/cloudant_hook.py @@ -15,10 +15,10 @@ from past.builtins import unicode import cloudant -import logging from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin class CloudantHook(BaseHook): @@ -35,9 +35,11 @@ class CloudantHook(BaseHook): def _str(s): # cloudant-python doesn't support unicode. if isinstance(s, unicode): - logging.debug(('cloudant-python does not support unicode. ' - 'Encoding %s as ascii using "ignore".'), - s) + log = LoggingMixin().logger + log.debug( + 'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".', + s + ) return s.encode('ascii', 'ignore') return s http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/databricks_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py index 18e20c4..7b20433 100644 --- a/airflow/contrib/hooks/databricks_hook.py +++ b/airflow/contrib/hooks/databricks_hook.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import logging import requests from airflow import __version__ @@ -22,6 +20,7 @@ from airflow.hooks.base_hook import BaseHook from requests import exceptions as requests_exceptions from requests.auth import AuthBase +from airflow.utils.log.LoggingMixin import LoggingMixin try: from urllib import parse as urlparse @@ -35,7 +34,7 @@ CANCEL_RUN_ENDPOINT = ('POST', 'api/2.0/jobs/runs/cancel') USER_AGENT_HEADER = {'user-agent': 'airflow-{v}'.format(v=__version__)} -class DatabricksHook(BaseHook): +class DatabricksHook(BaseHook, LoggingMixin): """ Interact with Databricks. """ @@ -101,10 +100,10 @@ class DatabricksHook(BaseHook): host=self._parse_host(self.databricks_conn.host), endpoint=endpoint) if 'token' in self.databricks_conn.extra_dejson: - logging.info('Using token auth.') + self.logger.info('Using token auth.') auth = _TokenAuth(self.databricks_conn.extra_dejson['token']) else: - logging.info('Using basic auth.') + self.logger.info('Using basic auth.') auth = (self.databricks_conn.login, self.databricks_conn.password) if method == 'GET': request_func = requests.get @@ -130,8 +129,10 @@ class DatabricksHook(BaseHook): response.content, response.status_code)) except (requests_exceptions.ConnectionError, requests_exceptions.Timeout) as e: - logging.error(('Attempt {0} API Request to Databricks failed ' + - 'with reason: {1}').format(attempt_num, e)) + self.logger.error( + 'Attempt %s API Request to Databricks failed with reason: %s', + attempt_num, e + ) raise AirflowException(('API requests to Databricks failed {} times. ' + 'Giving up.').format(self.retry_limit)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/datadog_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py index 2125701..0f5af00 100644 --- a/airflow/contrib/hooks/datadog_hook.py +++ b/airflow/contrib/hooks/datadog_hook.py @@ -13,14 +13,14 @@ # limitations under the License. import time -import logging - from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException from datadog import initialize, api +from airflow.utils.log.LoggingMixin import LoggingMixin + -class DatadogHook(BaseHook): +class DatadogHook(BaseHook, LoggingMixin): """ Uses datadog API to send metrics of practically anything measurable, so it's possible to track # of db records inserted/deleted, records read @@ -32,7 +32,6 @@ class DatadogHook(BaseHook): :param datadog_conn_id: The connection to datadog, containing metadata for api keys. :param datadog_conn_id: string """ - def __init__(self, datadog_conn_id='datadog_default'): conn = self.get_connection(datadog_conn_id) self.api_key = conn.extra_dejson.get('api_key', None) @@ -48,7 +47,7 @@ class DatadogHook(BaseHook): if self.app_key is None: raise AirflowException("app_key must be specified in the Datadog connection details") - logging.info("Setting up api keys for datadog") + self.logger.info("Setting up api keys for Datadog") options = { 'api_key': self.api_key, 'app_key': self.app_key @@ -57,8 +56,8 @@ class DatadogHook(BaseHook): def validate_response(self, response): if response['status'] != 'ok': - logging.error("Data dog returned: " + response) - raise AirflowException("Error status received from datadog") + self.logger.error("Datadog returned: %s", response) + raise AirflowException("Error status received from Datadog") def send_metric(self, metric_name, datapoint, tags=None): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/datastore_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py index 7a4386a..2ff1600 100644 --- a/airflow/contrib/hooks/datastore_hook.py +++ b/airflow/contrib/hooks/datastore_hook.py @@ -15,7 +15,6 @@ import json import time -import logging from apiclient.discovery import build from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook @@ -137,7 +136,7 @@ class DatastoreHook(GoogleCloudBaseHook): result = self.get_operation(name) state = result['metadata']['common']['state'] if state == 'PROCESSING': - logging.info('Operation is processing. Re-polling state in {} seconds' + self.logger.info('Operation is processing. Re-polling state in {} seconds' .format(polling_interval_in_seconds)) time.sleep(polling_interval_in_seconds) else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/ftp_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py index 148811f..a6b3181 100644 --- a/airflow/contrib/hooks/ftp_hook.py +++ b/airflow/contrib/hooks/ftp_hook.py @@ -15,11 +15,12 @@ import datetime import ftplib -import logging import os.path from airflow.hooks.base_hook import BaseHook from past.builtins import basestring +from airflow.utils.log.LoggingMixin import LoggingMixin + def mlsd(conn, path="", facts=None): """ @@ -54,7 +55,7 @@ def mlsd(conn, path="", facts=None): yield (name, entry) -class FTPHook(BaseHook): +class FTPHook(BaseHook, LoggingMixin): """ Interact with FTP. @@ -166,10 +167,9 @@ class FTPHook(BaseHook): remote_path, remote_file_name = os.path.split(remote_full_path) conn.cwd(remote_path) - logging.info('Retrieving file from FTP: {}'.format(remote_full_path)) + self.logger.info('Retrieving file from FTP: %s', remote_full_path) conn.retrbinary('RETR %s' % remote_file_name, output_handle.write) - logging.info('Finished retrieving file from FTP: {}'.format( - remote_full_path)) + self.logger.info('Finished retrieving file from FTP: %s', remote_full_path) if is_path: output_handle.close() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_api_base_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py index 48c5979..7476c90 100644 --- a/airflow/contrib/hooks/gcp_api_base_hook.py +++ b/airflow/contrib/hooks/gcp_api_base_hook.py @@ -12,18 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import logging -import json - import httplib2 from oauth2client.client import GoogleCredentials from oauth2client.service_account import ServiceAccountCredentials from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin -class GoogleCloudBaseHook(BaseHook): + +class GoogleCloudBaseHook(BaseHook, LoggingMixin): """ A base hook for Google cloud-related hooks. Google cloud has a shared REST API client that is built in the same way no matter which service you use. @@ -43,7 +41,6 @@ class GoogleCloudBaseHook(BaseHook): Legacy P12 key files are not supported. """ - def __init__(self, conn_id, delegate_to=None): """ :param conn_id: The connection ID to use when fetching connection info. @@ -69,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook): kwargs['sub'] = self.delegate_to if not key_path: - logging.info('Getting connection using `gcloud auth` user, since no key file ' + self.logger.info('Getting connection using `gcloud auth` user, since no key file ' 'is defined for hook.') credentials = GoogleCredentials.get_application_default() else: @@ -77,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook): raise AirflowException('Scope should be defined when using a key file.') scopes = [s.strip() for s in scope.split(',')] if key_path.endswith('.json'): - logging.info('Getting connection using a JSON key file.') + self.logger.info('Getting connection using a JSON key file.') credentials = ServiceAccountCredentials\ .from_json_keyfile_name(key_path, scopes) elif key_path.endswith('.p12'): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_dataflow_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py index fc73288..66dfb07 100644 --- a/airflow/contrib/hooks/gcp_dataflow_hook.py +++ b/airflow/contrib/hooks/gcp_dataflow_hook.py @@ -11,8 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging import select import subprocess import time @@ -21,10 +19,10 @@ import uuid from apiclient.discovery import build from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin -class _DataflowJob(object): - +class _DataflowJob(LoggingMixin): def __init__(self, dataflow, project_number, name): self._dataflow = dataflow self._project_number = project_number @@ -49,11 +47,15 @@ class _DataflowJob(object): job = self._dataflow.projects().jobs().get(projectId=self._project_number, jobId=self._job_id).execute() if 'currentState' in job: - logging.info('Google Cloud DataFlow job %s is %s', job['name'], - job['currentState']) + self.logger.info( + 'Google Cloud DataFlow job %s is %s', + job['name'], job['currentState'] + ) else: - logging.info('Google Cloud DataFlow with job_id %s has name %s', self._job_id, - job['name']) + self.logger.info( + 'Google Cloud DataFlow with job_id %s has name %s', + self._job_id, job['name'] + ) return job def wait_for_done(self): @@ -70,7 +72,7 @@ class _DataflowJob(object): elif 'JOB_STATE_RUNNING' == self._job['currentState']: time.sleep(10) else: - logging.debug(str(self._job)) + self.logger.debug(str(self._job)) raise Exception( "Google Cloud Dataflow job {} was unknown state: {}".format( self._job['name'], self._job['currentState'])) @@ -83,8 +85,7 @@ class _DataflowJob(object): return self._job -class _Dataflow(object): - +class _Dataflow(LoggingMixin): def __init__(self, cmd): self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -105,15 +106,15 @@ class _Dataflow(object): def wait_for_done(self): reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()] - logging.info("Start waiting for DataFlow process to complete.") + self.logger.info("Start waiting for DataFlow process to complete.") while self._proc.poll() is None: ret = select.select(reads, [], [], 5) if ret is not None: for fd in ret[0]: line = self._line(fd) - logging.debug(line[:-1]) + self.logger.debug(line[:-1]) else: - logging.info("Waiting for DataFlow process to complete.") + self.logger.info("Waiting for DataFlow process to complete.") if self._proc.returncode is not 0: raise Exception("DataFlow failed with return code {}".format( self._proc.returncode)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index c1d8993..3a1336e 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -12,16 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import logging import time import uuid from apiclient.discovery import build from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin -class _DataProcJob: +class _DataProcJob(LoggingMixin): def __init__(self, dataproc_api, project_id, job): self.dataproc_api = dataproc_api self.project_id = project_id @@ -30,8 +30,10 @@ class _DataProcJob: region='global', body=job).execute() self.job_id = self.job['reference']['jobId'] - logging.info('DataProc job %s is %s', self.job_id, - str(self.job['status']['state'])) + self.logger.info( + 'DataProc job %s is %s', + self.job_id, str(self.job['status']['state']) + ) def wait_for_done(self): while True: @@ -41,21 +43,23 @@ class _DataProcJob: jobId=self.job_id).execute() if 'ERROR' == self.job['status']['state']: print(str(self.job)) - logging.error('DataProc job %s has errors', self.job_id) - logging.error(self.job['status']['details']) - logging.debug(str(self.job)) + self.logger.error('DataProc job %s has errors', self.job_id) + self.logger.error(self.job['status']['details']) + self.logger.debug(str(self.job)) return False if 'CANCELLED' == self.job['status']['state']: print(str(self.job)) - logging.warning('DataProc job %s is cancelled', self.job_id) + self.logger.warning('DataProc job %s is cancelled', self.job_id) if 'details' in self.job['status']: - logging.warning(self.job['status']['details']) - logging.debug(str(self.job)) + self.logger.warning(self.job['status']['details']) + self.logger.debug(str(self.job)) return False if 'DONE' == self.job['status']['state']: return True - logging.debug('DataProc job %s is %s', self.job_id, - str(self.job['status']['state'])) + self.logger.debug( + 'DataProc job %s is %s', + self.job_id, str(self.job['status']['state']) + ) time.sleep(5) def raise_error(self, message=None): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_mlengine_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py index 47d9700..35f31a7 100644 --- a/airflow/contrib/hooks/gcp_mlengine_hook.py +++ b/airflow/contrib/hooks/gcp_mlengine_hook.py @@ -13,44 +13,40 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - -import logging import random import time -from airflow import settings -from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -from apiclient.discovery import build from apiclient import errors +from apiclient.discovery import build from oauth2client.client import GoogleCredentials -logging.getLogger('GoogleCloudMLEngine').setLevel(settings.LOGGING_LEVEL) +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func): + log = LoggingMixin().logger for i in range(0, max_n): try: response = request.execute() if is_error_func(response): raise ValueError( - 'The response contained an error: {}'.format(response)) + 'The response contained an error: {}'.format(response) + ) elif is_done_func(response): - logging.info('Operation is done: {}'.format(response)) + log.info('Operation is done: %s', response) return response else: time.sleep((2**i) + (random.randint(0, 1000) / 1000)) except errors.HttpError as e: if e.resp.status != 429: - logging.info( - 'Something went wrong. Not retrying: {}'.format(e)) + log.info('Something went wrong. Not retrying: %s', format(e)) raise else: time.sleep((2**i) + (random.randint(0, 1000) / 1000)) class MLEngineHook(GoogleCloudBaseHook): - def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None): super(MLEngineHook, self).__init__(gcp_conn_id, delegate_to) self._mlengine = self.get_conn() @@ -107,17 +103,20 @@ class MLEngineHook(GoogleCloudBaseHook): if use_existing_job_fn is not None: existing_job = self._get_job(project_id, job_id) if not use_existing_job_fn(existing_job): - logging.error( - 'Job with job_id {} already exist, but it does ' - 'not match our expectation: {}'.format( - job_id, existing_job)) + self.logger.error( + 'Job with job_id %s already exist, but it does ' + 'not match our expectation: %s', + job_id, existing_job + ) raise - logging.info( - 'Job with job_id {} already exist. Will waiting for it to ' - 'finish'.format(job_id)) + self.logger.info( + 'Job with job_id %s already exist. Will waiting for it to finish', + job_id + ) else: - logging.error('Failed to create MLEngine job: {}'.format(e)) + self.logger.error('Failed to create MLEngine job: {}'.format(e)) raise + return self._wait_for_job_done(project_id, job_id) def _get_job(self, project_id, job_id): @@ -140,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook): # polling after 30 seconds when quota failure occurs time.sleep(30) else: - logging.error('Failed to get MLEngine job: {}'.format(e)) + self.logger.error('Failed to get MLEngine job: {}'.format(e)) raise def _wait_for_job_done(self, project_id, job_id, interval=30): @@ -192,11 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook): try: response = request.execute() - logging.info( - 'Successfully set version: {} to default'.format(response)) + self.logger.info('Successfully set version: %s to default', response) return response except errors.HttpError as e: - logging.error('Something went wrong: {}'.format(e)) + self.logger.error('Something went wrong: %s', e) raise def list_versions(self, project_id, model_name): @@ -264,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook): return request.execute() except errors.HttpError as e: if e.resp.status == 404: - logging.error('Model was not found: {}'.format(e)) + self.logger.error('Model was not found: %s', e) return None raise http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcs_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index b5f3edc..eb17c3b 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -12,17 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # - -import logging - from apiclient.discovery import build from apiclient.http import MediaFileUpload from googleapiclient import errors from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -logging.getLogger("google_cloud_storage").setLevel(logging.INFO) - class GoogleCloudStorageHook(GoogleCloudBaseHook): """ @@ -187,8 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): ts = ts.replace(tzinfo=dateutil.tz.tzutc()) updated = dateutil.parser.parse(response['updated']) - logging.log(logging.INFO, "Verify object date: " + str(updated) - + " > " + str(ts)) + self.logger.info("Verify object date: %s > %s", updated, ts) if updated > ts: return True @@ -253,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): ).execute() if 'items' not in response: - logging.info("No items found for prefix:{}".format(prefix)) + self.logger.info("No items found for prefix: %s", prefix) break for item in response['items']: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/jira_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py index 148101b..8702608 100644 --- a/airflow/contrib/hooks/jira_hook.py +++ b/airflow/contrib/hooks/jira_hook.py @@ -11,24 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import logging - from jira import JIRA from jira.exceptions import JIRAError from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin -class JiraHook(BaseHook): +class JiraHook(BaseHook, LoggingMixin): """ Jira interaction hook, a Wrapper around JIRA Python SDK. :param jira_conn_id: reference to a pre-defined Jira Connection :type jira_conn_id: string """ - def __init__(self, jira_conn_id='jira_default'): super(JiraHook, self).__init__(jira_conn_id) @@ -38,7 +35,7 @@ class JiraHook(BaseHook): def get_conn(self): if not self.client: - logging.debug('creating jira client for conn_id: {0}'.format(self.jira_conn_id)) + self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id) get_server_info = True validate = True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/qubole_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py index c51a757..1a5e7ec 100755 --- a/airflow/contrib/hooks/qubole_hook.py +++ b/airflow/contrib/hooks/qubole_hook.py @@ -16,12 +16,12 @@ import os import time import datetime -import logging import six from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow import configuration +from airflow.utils.log.LoggingMixin import LoggingMixin from airflow.utils.state import State from qds_sdk.qubole import Qubole @@ -68,7 +68,7 @@ COMMAND_ARGS = { } -class QuboleHook(BaseHook): +class QuboleHook(BaseHook, LoggingMixin): def __init__(self, *args, **kwargs): conn = self.get_connection(kwargs['qubole_conn_id']) Qubole.configure(api_token=conn.password, api_url=conn.host) @@ -84,31 +84,33 @@ class QuboleHook(BaseHook): cmd_id = ti.xcom_pull(key='qbol_cmd_id', task_ids=ti.task_id) if cmd_id is not None: - logger = logging.getLogger("QuboleHook") cmd = Command.find(cmd_id) if cmd is not None: + log = LoggingMixin().logger if cmd.status == 'done': - logger.info('Command ID: %s has been succeeded, hence marking this ' + log.info('Command ID: %s has been succeeded, hence marking this ' 'TI as Success.', cmd_id) ti.state = State.SUCCESS elif cmd.status == 'running': - logger.info('Cancelling the Qubole Command Id: %s', cmd_id) + log.info('Cancelling the Qubole Command Id: %s', cmd_id) cmd.cancel() def execute(self, context): args = self.cls.parse(self.create_cmd_args(context)) self.cmd = self.cls.create(**args) context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id) - logging.info("Qubole command created with Id: %s and Status: %s", - self.cmd.id, self.cmd.status) + self.logger.info( + "Qubole command created with Id: %s and Status: %s", + self.cmd.id, self.cmd.status + ) while not Command.is_done(self.cmd.status): time.sleep(Qubole.poll_interval) self.cmd = self.cls.find(self.cmd.id) - logging.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status) + self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status) if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True: - logging.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log()) + self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log()) if self.cmd.status != 'done': raise AirflowException('Command Id: {0} failed with Status: {1}'.format( @@ -124,7 +126,7 @@ class QuboleHook(BaseHook): cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id) self.cmd = self.cls.find(cmd_id) if self.cls and self.cmd: - logging.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id) + self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id) self.cmd.cancel() def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/redis_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py index 936eff8..a8999d6 100644 --- a/airflow/contrib/hooks/redis_hook.py +++ b/airflow/contrib/hooks/redis_hook.py @@ -15,16 +15,14 @@ """ RedisHook module """ - -import logging - from redis import StrictRedis from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook +from airflow.utils.log.LoggingMixin import LoggingMixin -class RedisHook(BaseHook): +class RedisHook(BaseHook, LoggingMixin): """ Hook to interact with Redis database """ @@ -42,7 +40,7 @@ class RedisHook(BaseHook): self.port = int(conn.port) self.password = conn.password self.db = int(conn.extra_dejson.get('db', 0)) - self.logger = logging.getLogger(__name__) + self.logger.debug( '''Connection "{conn}": \thost: {host} @@ -62,11 +60,9 @@ class RedisHook(BaseHook): """ if not self.client: self.logger.debug( - 'generating Redis client for conn_id "{conn}" on ' - '{host}:{port}:{db}'.format(conn=self.redis_conn_id, - host=self.host, - port=self.port, - db=self.db)) + 'generating Redis client for conn_id "%s" on %s:%s:%s', + self.redis_conn_id, self.host, self.port, self.db + ) try: self.client = StrictRedis( host=self.host,
