Repository: incubator-airflow
Updated Branches:
  refs/heads/master 76b68b82d -> 06aec8ea6


[AIRFLOW-2429] Make Airflow flake8 compliant

Closes #3342 from feng-tao/airflow-2429


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/06aec8ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/06aec8ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/06aec8ea

Branch: refs/heads/master
Commit: 06aec8ea6b4d8f551a1360f79a1a58114f614753
Parents: 76b68b8
Author: Tao feng <tf...@lyft.com>
Authored: Sat May 19 00:29:59 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Sat May 19 00:30:05 2018 +0200

----------------------------------------------------------------------
 airflow/configuration.py |  24 +++----
 airflow/models.py        | 145 ++++++++++++++++++++++--------------------
 airflow/version.py       |   4 +-
 3 files changed, 87 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06aec8ea/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 20ef067..e19a8b1 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -22,31 +22,27 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
+from builtins import str
+from collections import OrderedDict
 import copy
 import errno
+from future import standard_library
 import os
-import subprocess
-import warnings
 import shlex
-import sys
-
-from future import standard_library
-
 import six
 from six import iteritems
+import subprocess
+import sys
+import warnings
+
 from backports.configparser import ConfigParser
 from zope.deprecation import deprecated as _deprecated
 
+from airflow.exceptions import AirflowConfigException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
 
-from builtins import str
-from collections import OrderedDict
-
-from airflow.exceptions import AirflowConfigException
-
-
 log = LoggingMixin().log
 
 # show Airflow's deprecation warnings
@@ -323,8 +319,8 @@ class AirflowConfigParser(ConfigParser):
                 opt = None
             if opt:
                 if (
-                        not display_sensitive
-                        and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
+                    not display_sensitive and
+                        ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
                     opt = '< hidden >'
                 if display_source:
                     opt = (opt, 'env var')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06aec8ea/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7aab4b5..4c1be8e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -28,6 +28,7 @@ from builtins import str
 from builtins import object, bytes
 import copy
 from collections import namedtuple, defaultdict
+import cryptography
 from datetime import timedelta
 
 import dill
@@ -59,7 +60,6 @@ from sqlalchemy import (
     Index, Float, LargeBinary)
 from sqlalchemy import func, or_, and_, true as sqltrue
 from sqlalchemy.ext.declarative import declarative_base, declared_attr
-from sqlalchemy.dialects.mysql import LONGTEXT
 from sqlalchemy.orm import reconstructor, relationship, synonym
 from sqlalchemy_utc import UtcDateTime
 
@@ -77,7 +77,6 @@ from airflow.lineage import apply_lineage, prepare_lineage
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
-from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
 
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
 from airflow.utils import timezone
@@ -86,7 +85,7 @@ from airflow.utils.db import provide_session
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.email import send_email
 from airflow.utils.helpers import (
-    as_tuple, is_container, is_in, validate_key, pprinttable)
+    as_tuple, is_container, validate_key, pprinttable)
 from airflow.utils.operator_resources import Resources
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
@@ -103,6 +102,7 @@ XCOM_RETURN_KEY = 'return_value'
 
 Stats = settings.Stats
 
+
 def get_fernet():
     """
     Deferred load of Fernet key.
@@ -115,7 +115,7 @@ def get_fernet():
     """
     try:
         from cryptography.fernet import Fernet
-    except:
+    except ImportError:
         raise AirflowException('Failed to import Fernet, it may not be 
installed')
     try:
         return Fernet(configuration.conf.get('core', 
'FERNET_KEY').encode('utf-8'))
@@ -126,6 +126,7 @@ def get_fernet():
 # Used by DAG context_managers
 _CONTEXT_MANAGER_DAG = None
 
+
 def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
     """
     Clears a set of task instances, but makes sure the running ones
@@ -247,7 +248,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                 filepath=orm_dag.fileloc, only_if_updated=False)
 
             # If the source file no longer exports `dag_id`, delete it from 
self.dags
-            if found_dags and dag_id in [dag.dag_id for dag in found_dags]:
+            if found_dags and dag_id in [found_dag.dag_id for found_dag in 
found_dags]:
                 return self.dags[dag_id]
             elif dag_id in self.dags:
                 del self.dags[dag_id]
@@ -354,7 +355,6 @@ class DagBag(BaseDagBag, LoggingMixin):
                         self.file_last_changed[dag.full_filepath] = \
                             file_last_changed_on_disk
 
-
         self.file_last_changed[filepath] = file_last_changed_on_disk
         return found_dags
 
@@ -429,7 +429,6 @@ class DagBag(BaseDagBag, LoggingMixin):
                         del self.dags[subdag.dag_id]
             raise cycle_exception
 
-
     def collect_dags(
             self,
             dag_folder=None,
@@ -644,7 +643,7 @@ class Connection(Base, LoggingMixin):
         if self._password and self.is_encrypted:
             try:
                 fernet = get_fernet()
-            except:
+            except AirflowException:
                 raise AirflowException(
                     "Can't decrypt encrypted password for login={}, \
                     FERNET_KEY configuration is missing".format(self.login))
@@ -660,7 +659,7 @@ class Connection(Base, LoggingMixin):
                 self.is_encrypted = True
             except AirflowException:
                 self.log.exception("Failed to load fernet while encrypting 
value, "
-                                    "using non-encrypted value.")
+                                   "using non-encrypted value.")
                 self._password = value
                 self.is_encrypted = False
 
@@ -673,7 +672,7 @@ class Connection(Base, LoggingMixin):
         if self._extra and self.is_extra_encrypted:
             try:
                 fernet = get_fernet()
-            except:
+            except AirflowException:
                 raise AirflowException(
                     "Can't decrypt `extra` params for login={},\
                     FERNET_KEY configuration is missing".format(self.login))
@@ -689,7 +688,7 @@ class Connection(Base, LoggingMixin):
                 self.is_extra_encrypted = True
             except AirflowException:
                 self.log.exception("Failed to load fernet while encrypting 
value, "
-                                    "using non-encrypted value.")
+                                   "using non-encrypted value.")
                 self._extra = value
                 self.is_extra_encrypted = False
         else:
@@ -757,7 +756,7 @@ class Connection(Base, LoggingMixin):
             elif self.conn_type == 'cassandra':
                 from airflow.contrib.hooks.cassandra_hook import CassandraHook
                 return CassandraHook(cassandra_conn_id=self.conn_id)
-        except:
+        except Exception:
             pass
 
     def __repr__(self):
@@ -1330,8 +1329,11 @@ class TaskInstance(Base, LoggingMixin):
         if self.task.retry_exponential_backoff:
             min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 
2)))
             # deterministic per task instance
-            hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, 
self.task_id,
-                self.execution_date, 
self.try_number).encode('utf-8')).hexdigest(), 16)
+            hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id,
+                                                         self.task_id,
+                                                         self.execution_date,
+                                                         self.try_number)
+                                    .encode('utf-8')).hexdigest(), 16)
             # between 0.5 * delay * (2^retry_number) and 1.0 * delay * 
(2^retry_number)
             modded_hash = min_backoff + hash % min_backoff
             # timedelta has a maximum representable value. The exponentiation
@@ -1453,7 +1455,7 @@ class TaskInstance(Base, LoggingMixin):
             session.commit()
             return False
 
-        #TODO: Logging needs cleanup, not clear what is being printed
+        # TODO: Logging needs cleanup, not clear what is being printed
         hr = "\n" + ("-" * 80) + "\n"  # Line break
 
         # For reporting purposes, we report based on 1-indexed,
@@ -1518,7 +1520,8 @@ class TaskInstance(Base, LoggingMixin):
         settings.engine.dispose()
         if verbose:
             if mark_success:
-                msg = "Marking success for {} on {}".format(self.task, 
self.execution_date)
+                msg = "Marking success for {} on {}".format(self.task,
+                                                            
self.execution_date)
                 self.log.info(msg)
             else:
                 msg = "Executing {} on {}".format(self.task, 
self.execution_date)
@@ -1661,23 +1664,23 @@ class TaskInstance(Base, LoggingMixin):
             pool=None,
             session=None):
         res = self._check_and_change_state_before_execution(
-                verbose=verbose,
-                ignore_all_deps=ignore_all_deps,
-                ignore_depends_on_past=ignore_depends_on_past,
-                ignore_task_deps=ignore_task_deps,
-                ignore_ti_state=ignore_ti_state,
+            verbose=verbose,
+            ignore_all_deps=ignore_all_deps,
+            ignore_depends_on_past=ignore_depends_on_past,
+            ignore_task_deps=ignore_task_deps,
+            ignore_ti_state=ignore_ti_state,
+            mark_success=mark_success,
+            test_mode=test_mode,
+            job_id=job_id,
+            pool=pool,
+            session=session)
+        if res:
+            self._run_raw_task(
                 mark_success=mark_success,
                 test_mode=test_mode,
                 job_id=job_id,
                 pool=pool,
                 session=session)
-        if res:
-            self._run_raw_task(
-                    mark_success=mark_success,
-                    test_mode=test_mode,
-                    job_id=job_id,
-                    pool=pool,
-                    session=session)
 
     def dry_run(self):
         task = self.task
@@ -2074,7 +2077,7 @@ class SkipMixin(LoggingMixin):
                 TaskInstance.dag_id == dag_run.dag_id,
                 TaskInstance.execution_date == dag_run.execution_date,
                 TaskInstance.task_id.in_(task_ids)
-            ).update({TaskInstance.state : State.SKIPPED,
+            ).update({TaskInstance.state: State.SKIPPED,
                       TaskInstance.start_date: now,
                       TaskInstance.end_date: now},
                      synchronize_session=False)
@@ -2732,8 +2735,7 @@ class BaseOperator(LoggingMixin):
         return self._downstream_task_ids
 
     @provide_session
-    def clear(
-              self,
+    def clear(self,
               start_date=None,
               end_date=None,
               upstream=False,
@@ -2810,7 +2812,6 @@ class BaseOperator(LoggingMixin):
         return list(map(lambda task_id: self._dag.task_dict[task_id],
                         self.get_flat_relative_ids(upstream)))
 
-
     def run(
             self,
             start_date=None,
@@ -2970,8 +2971,9 @@ class DagModel(Base):
     dag_id = Column(String(ID_LEN), primary_key=True)
     # A DAG can be paused from the UI / DB
     # Set this default value of is_paused based on a configuration value!
-    is_paused_at_creation = configuration.conf.getboolean('core',
-                                                     
'dags_are_paused_at_creation')
+    is_paused_at_creation = configuration.conf\
+        .getboolean('core',
+                    'dags_are_paused_at_creation')
     is_paused = Column(Boolean, default=is_paused_at_creation)
     # Whether the DAG is a subdag
     is_subdag = Column(Boolean, default=False)
@@ -3072,7 +3074,8 @@ class DAG(BaseDag, LoggingMixin):
     :param sla_miss_callback: specify a function to call when reporting SLA
         timeouts.
     :type sla_miss_callback: types.FunctionType
-    :param default_view: Specify DAG default view (tree, graph, duration, 
gantt, landing_times)
+    :param default_view: Specify DAG default view (tree, graph, duration,
+                                                   gantt, landing_times)
     :type default_view: string
     :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
     :type orientation: string
@@ -3539,14 +3542,14 @@ class DAG(BaseDag, LoggingMixin):
         # Check SubDag for class but don't check class directly, see
         # https://github.com/airbnb/airflow/issues/1168
         from airflow.operators.subdag_operator import SubDagOperator
-        l = []
+        subdag_lst = []
         for task in self.tasks:
             if (isinstance(task, SubDagOperator) or
-                #TODO remove in Airflow 2.0
-                type(task).__name__ == 'SubDagOperator'):
-                l.append(task.subdag)
-                l += task.subdag.subdags
-        return l
+                    # TODO remove in Airflow 2.0
+                    type(task).__name__ == 'SubDagOperator'):
+                subdag_lst.append(task.subdag)
+                subdag_lst += task.subdag.subdags
+        return subdag_lst
 
     def resolve_template_files(self):
         for t in self.tasks:
@@ -4276,13 +4279,13 @@ class Variable(Base, LoggingMixin):
         if self._val and self.is_encrypted:
             try:
                 fernet = get_fernet()
-            except:
+            except Exception:
                 log.error("Can't decrypt _val for key={}, FERNET_KEY "
                           "configuration missing".format(self.key))
                 return None
             try:
                 return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
-            except:
+            except cryptography.fernet.InvalidToken:
                 log.error("Can't decrypt _val for key={}, invalid token "
                           "or value".format(self.key))
                 return None
@@ -4297,7 +4300,8 @@ class Variable(Base, LoggingMixin):
                 self.is_encrypted = True
             except AirflowException:
                 self.log.exception(
-                    "Failed to load fernet while encrypting value, using 
non-encrypted value."
+                    "Failed to load fernet while encrypting value, "
+                    "using non-encrypted value."
                 )
                 self._val = value
                 self.is_encrypted = False
@@ -4323,7 +4327,8 @@ class Variable(Base, LoggingMixin):
         :return: Mixed
         """
         default_sentinel = object()
-        obj = Variable.get(key, default_var=default_sentinel, 
deserialize_json=deserialize_json)
+        obj = Variable.get(key, default_var=default_sentinel,
+                           deserialize_json=deserialize_json)
         if obj is default_sentinel:
             if default is not None:
                 Variable.set(key, default, serialize_json=deserialize_json)
@@ -4449,8 +4454,7 @@ class XCom(Base, LoggingMixin):
 
     @classmethod
     @provide_session
-    def get_one(
-                cls,
+    def get_one(cls,
                 execution_date,
                 key=None,
                 task_id=None,
@@ -4460,9 +4464,11 @@ class XCom(Base, LoggingMixin):
                 session=None):
         """
         Retrieve an XCom value, optionally meeting certain criteria.
-        TODO: "pickling" has been deprecated and JSON is preferred. "pickling" 
will be removed in Airflow 2.0.
+        TODO: "pickling" has been deprecated and JSON is preferred.
+              "pickling" will be removed in Airflow 2.0.
 
-        :param enable_pickling: If pickling is not enabled, the XCOM value 
will be parsed to JSON instead.
+        :param enable_pickling: If pickling is not enabled,
+                                the XCOM value will be parsed to JSON instead.
         :return: XCom value
         """
         filters = []
@@ -4478,9 +4484,8 @@ class XCom(Base, LoggingMixin):
             filters.append(cls.execution_date == execution_date)
 
         query = (
-            session.query(cls.value)
-                .filter(and_(*filters))
-                .order_by(cls.execution_date.desc(), cls.timestamp.desc()))
+            session.query(cls.value).filter(and_(*filters))
+                   .order_by(cls.execution_date.desc(), cls.timestamp.desc()))
 
         result = query.first()
         if result:
@@ -4504,19 +4509,19 @@ class XCom(Base, LoggingMixin):
 
     @classmethod
     @provide_session
-    def get_many(
-                cls,
-                execution_date,
-                key=None,
-                task_ids=None,
-                dag_ids=None,
-                include_prior_dates=False,
-                limit=100,
-                enable_pickling=None,
-                session=None):
+    def get_many(cls,
+                 execution_date,
+                 key=None,
+                 task_ids=None,
+                 dag_ids=None,
+                 include_prior_dates=False,
+                 limit=100,
+                 enable_pickling=None,
+                 session=None):
         """
         Retrieve an XCom value, optionally meeting certain criteria
-        TODO: "pickling" has been deprecated and JSON is preferred. "pickling" 
will be removed in Airflow 2.0.
+        TODO: "pickling" has been deprecated and JSON is preferred.
+              "pickling" will be removed in Airflow 2.0.
         """
         filters = []
         if key:
@@ -4531,10 +4536,9 @@ class XCom(Base, LoggingMixin):
             filters.append(cls.execution_date == execution_date)
 
         query = (
-            session.query(cls)
-                .filter(and_(*filters))
-                .order_by(cls.execution_date.desc(), cls.timestamp.desc())
-                .limit(limit))
+            session.query(cls).filter(and_(*filters))
+                              .order_by(cls.execution_date.desc(), 
cls.timestamp.desc())
+                              .limit(limit))
         results = query.all()
         if enable_pickling is None:
             enable_pickling = configuration.conf.getboolean(
@@ -4625,7 +4629,7 @@ class DagStat(Base):
             if dag_ids:
                 qry = qry.filter(DagStat.dag_id.in_(set(dag_ids)))
             if dirty_only:
-                qry = qry.filter(DagStat.dirty == True)
+                qry = qry.filter(DagStat.dirty == True) # noqa
 
             qry = qry.with_for_update().all()
 
@@ -4919,7 +4923,8 @@ class DagRun(Base, LoggingMixin):
             session=session
         )
         none_depends_on_past = all(not t.task.depends_on_past for t in 
unfinished_tasks)
-        none_task_concurrency = all(t.task.task_concurrency is None for t in 
unfinished_tasks)
+        none_task_concurrency = all(t.task.task_concurrency is None
+                                    for t in unfinished_tasks)
         # small speed up
         if unfinished_tasks and none_depends_on_past and none_task_concurrency:
             # todo: this can actually get pretty slow: one task costs between 
0.01-015s
@@ -5036,7 +5041,7 @@ class DagRun(Base, LoggingMixin):
         """
         qry = session.query(DagRun).filter(
             DagRun.dag_id == dag_id,
-            DagRun.external_trigger == False,
+            DagRun.external_trigger == False, # noqa
             DagRun.execution_date == execution_date,
         )
         return qry.first()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/06aec8ea/airflow/version.py
----------------------------------------------------------------------
diff --git a/airflow/version.py b/airflow/version.py
index 750da36..d11d766 100644
--- a/airflow/version.py
+++ b/airflow/version.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

Reply via email to