This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7930234  Remove redundant "and_" condition when using filter (#10232)
7930234 is described below

commit 7930234726c5e9cb9745cc7944047ac343ab832a
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Aug 8 18:05:36 2020 +0100

    Remove redundant "and_" condition when using filter (#10232)
    
    Multiple criteria may be specified as comma separated; the effect is that 
they will be joined together using the and_() function ( 
https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.filter)
---
 airflow/api_connexion/endpoints/dag_run_endpoint.py | 6 +++---
 airflow/api_connexion/endpoints/xcom_endpoint.py    | 7 +++----
 airflow/jobs/scheduler_job.py                       | 4 ++--
 airflow/models/dagcode.py                           | 6 +++---
 airflow/models/renderedtifields.py                  | 8 ++++----
 5 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 7c08423..c4ed07d 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -17,7 +17,7 @@
 from connexion import NoContent
 from flask import request
 from marshmallow import ValidationError
-from sqlalchemy import and_, func
+from sqlalchemy import func
 
 from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, 
NotFound
 from airflow.api_connexion.parameters import check_limit, format_datetime, 
format_parameters
@@ -36,7 +36,7 @@ def delete_dag_run(dag_id, dag_run_id, session):
     """
     if (
         session.query(DagRun)
-            .filter(and_(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))
+            .filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)
             .delete() == 0
     ):
         raise NotFound(detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: 
'{dag_run_id}' not found")
@@ -162,7 +162,7 @@ def post_dag_run(dag_id, session):
     post_body = dagrun_schema.load(request.json, session=session)
     dagrun_instance = (
         session.query(DagRun).filter(
-            and_(DagRun.dag_id == dag_id, DagRun.run_id == 
post_body["run_id"])).first()
+            DagRun.dag_id == dag_id, DagRun.run_id == 
post_body["run_id"]).first()
     )
     if not dagrun_instance:
         dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL.value, 
**post_body)
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py 
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 13dff43..0c20192 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -74,10 +74,9 @@ def get_xcom_entry(
     """
     Get an XCom entry
     """
-    query = session.query(XCom)
-    query = query.filter(and_(XCom.dag_id == dag_id,
-                              XCom.task_id == task_id,
-                              XCom.key == xcom_key))
+    query = session.query(XCom).filter(XCom.dag_id == dag_id,
+                                       XCom.task_id == task_id,
+                                       XCom.key == xcom_key)
     query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.execution_date 
== DR.execution_date))
     query = query.filter(DR.run_id == dag_run_id)
 
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 90b2e1c..1642292 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1131,11 +1131,11 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
             subq = query.subquery()
             tis_changed = session \
                 .query(models.TaskInstance) \
-                .filter(and_(
+                .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
-                    subq.c.execution_date)) \
+                    subq.c.execution_date) \
                 .update({models.TaskInstance.state: new_state}, 
synchronize_session=False)
             session.commit()
 
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index 5146ebe..8defbeb 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -20,7 +20,7 @@ import struct
 from datetime import datetime
 from typing import Iterable, Optional
 
-from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+from sqlalchemy import BigInteger, Column, String, UnicodeText, exists
 
 from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models.base import Base
@@ -151,8 +151,8 @@ class DagCode(Base):
         log.debug("Deleting code from %s table ", cls.__tablename__)
 
         session.query(cls).filter(
-            and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
-                 
cls.fileloc.notin_(alive_dag_filelocs))).delete(synchronize_session='fetch')
+            cls.fileloc_hash.notin_(alive_fileloc_hashes),
+            
cls.fileloc.notin_(alive_dag_filelocs)).delete(synchronize_session='fetch')
 
     @classmethod
     @provide_session
diff --git a/airflow/models/renderedtifields.py 
b/airflow/models/renderedtifields.py
index 3647dff..e4f1f00 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -120,10 +120,10 @@ class RenderedTaskInstanceFields(Base):
             subq1 = tis_to_keep_query.subquery('subq1')
 
             session.query(cls) \
-                .filter(and_(
+                .filter(
                     cls.dag_id == dag_id,
                     cls.task_id == task_id,
-                    tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq1))) \
+                    tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq1)) \
                 .delete(synchronize_session=False)
         elif session.bind.dialect.name in ["mysql"]:
             # Fetch Top X records given dag_id & task_id ordered by Execution 
Date
@@ -140,10 +140,10 @@ class RenderedTaskInstanceFields(Base):
             )
 
             session.query(cls) \
-                .filter(and_(
+                .filter(
                     cls.dag_id == dag_id,
                     cls.task_id == task_id,
-                    tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq2))) \
+                    tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq2)) \
                 .delete(synchronize_session=False)
         else:
             # Fetch Top X records given dag_id & task_id ordered by Execution 
Date

Reply via email to