casassg commented on a change in pull request #10587:
URL: https://github.com/apache/airflow/pull/10587#discussion_r506623798
##########
File path: tests/models/test_dagparam.py
##########
@@ -0,0 +1,80 @@
+import unittest
+from datetime import datetime, timedelta
+
+from airflow.jobs.scheduler_job import TI
+from airflow.models import DagRun
+from airflow.models.dag import DAG
+from airflow.models.dagparam import dag as dag_decorator
+from airflow.operators.python import (
+ PythonOperator,
+ task)
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.test_utils.config import conf_vars
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.types import DagRunType
+from tests.test_utils.db import clear_db_runs
+
+DEFAULT_ARGS = {
+ "owner": "test",
+ "depends_on_past": True,
+ "start_date": timezone.utcnow(),
+ "retries": 1,
+ "retry_delay": timedelta(minutes=1),
+}
+DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+
+VALUE = 42
+
+
+class TestDagParamRuntime(unittest.TestCase):
+
+ def tearDown(self):
+ super().tearDown()
+ clear_db_runs()
+
+ def test_xcom_pass_to_op(self):
+ with DAG(dag_id="test_xcom_pass_to_op", default_args=DEFAULT_ARGS) as
dag:
+ value = dag.param('value', default=VALUE)
+ @task
+ def return_num(num):
+ return num
+
+ xcom_arg = return_num(value)
+
+ dr = dag.create_dagrun(
+ run_id=DagRunType.MANUAL.value,
+ start_date=timezone.utcnow(),
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING
+ )
+
+ xcom_arg.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+ ti = dr.get_task_instances()[0]
+ assert ti.xcom_pull() == VALUE
+
+
+class TestDagDecorator:
+ @conf_vars({("core", "executor"): "DebugExecutor"})
+ def test_xcom_pass_to_op(self):
+
+ @dag_decorator(default_args=DEFAULT_ARGS)
+ def test_pipeline(some_param, other_param=VALUE):
Review comment:
Yup. I can take a first pass at adding documentation so this is clear
##########
File path: airflow/models/dagparam.py
##########
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Callable
+
+from inspect import signature
+
+
+import functools
+
+
+class DagParam:
+ """
+ Class that represents a DAG run parameter.
+
+ It can be used to parametrized your dags.
+
+ **Example**:
+
+ with DAG(...) as dag:
+ EmailOperator(subject=dag.param('subject', 'Hi from Airflow!'))
+
+ This object can be used in legacy Operators via Jinja.
+
+ :param current_dag: Dag that will be used to pull the parameter from.
+ :type current_dag: airflow.models.dag.DAG
+ :param name: key value which is used to set the parameter
+ :type name: str
+ :param default: Default value used if no parameter was set.
+ :type default: Any
+ """
+
+ def __init__(self, current_dag, name: str, default: Any):
+ current_dag.params[name] = default
+ self._name = name
+ self._default = default
+
+ def resolve(self, context: Dict) -> Any:
+ """
+ Pull DagParam value from DagRun context. This method is run during
``op.execute()``
+ in respectable context.
+ """
+ return context.get('params', {}).get(self._name, self._default)
+
+
+def dag(*dag_args, **dag_kwargs):
Review comment:
Moved and added the import
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]