turbaszek commented on a change in pull request #10587:
URL: https://github.com/apache/airflow/pull/10587#discussion_r506639557



##########
File path: airflow/models/dag.py
##########
@@ -2187,6 +2199,33 @@ def calculate_dagrun_date_fields(
 
         log.info("Setting next_dagrun for %s to %s", dag.dag_id, 
self.next_dagrun)
 
+def dag(*dag_args, **dag_kwargs):
+    """
+    Python dag decorator. Wraps a function into an Airflow DAG.
+    Accepts kwargs for operator kwarg. Can be used to parametrize DAGs.
+
+    :param dag_args: Arguments for DAG object
+    :type dag_args: list
+    :param dag_kwargs: Kwargs for DAG object.
+    :type dag_kwargs: dict
+    """
+    def wrapper(f: Callable):
+        dag_sig = signature(DAG.__init__)
+        dag_sig = dag_sig.bind_partial(*dag_args, **dag_kwargs)
+
+        @functools.wraps(f)
+        def factory(*args, **kwargs):
+            f_sig = signature(f).bind(*args, **kwargs)
+            f_sig.apply_defaults()
+            with DAG(*dag_sig.args, dag_id=f.__name__, **dag_sig.kwargs) as 
dag_obj:
+                f_kwargs = {}
+                for name, value in f_sig.arguments.items():
+                    f_kwargs[name] = dag_obj.param(name, value)
+                f(**f_kwargs)
+            return dag_obj

Review comment:
       Can you add some comments describing what's going on here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to