rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add 
operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r367030797

 File path: airflow/sensors/external_task_sensor.py
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 from sqlalchemy import func
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, 
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
+def get_possible_target_execution_dates(execution_date, execution_delta, 
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+    return dttm if isinstance(dttm, list) else [dttm]
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 Review comment:
   For now I've opted for ensuring the link isn't added if `execution_date_fn` 
is provided @yuqian90 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

Reply via email to