Lee-W commented on code in PR #54702:
URL: https://github.com/apache/airflow/pull/54702#discussion_r2290163148
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
Review Comment:
```suggestion
return f"Every {int(seconds)} second{'s' if seconds > 1 else ''}"
```
##########
airflow-core/src/airflow/cli/cli_config.py:
##########
@@ -988,6 +994,12 @@ class GroupCommand(NamedTuple):
func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE),
),
+ ActionCommand(
+ name="docs",
+ help="Show DAG documentation (__doc_md__) in markdown format",
Review Comment:
```suggestion
help="Show Dag documentation (__doc_md__) in markdown format",
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
Review Comment:
```suggestion
"""Display Dag documentation (__doc_md__) at the command line."""
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
Review Comment:
```suggestion
return str(timetable.__class__.__name__)
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
+ if args.dag_id:
+ # Get specific DAG
+ dag = get_dag(args.subdir, args.dag_id)
+ dags = [dag]
+ else:
+ # Get all DAGs
+ dagbag = DagBag(process_subdir(args.subdir))
+ if dagbag.import_errors:
+ from rich import print as rich_print
+
+ rich_print(
+ "[red][bold]Error:[/bold] Failed to load all files. "
+ "For details, run `airflow dags list-import-errors`",
+ file=sys.stderr,
+ )
+ dags = sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id"))
+
+ # Generate markdown output
+ markdown_output = []
+
+ if len(dags) > 1:
+ markdown_output.append("# DAG Documentation\n")
+
+ for dag in dags:
+ doc_md = getattr(dag, '__doc_md__', None) or getattr(dag, 'doc_md',
None)
+
+ # Add DAG header
+ if len(dags) > 1:
+ markdown_output.append(f"## {dag.dag_id}\n")
+ else:
+ markdown_output.append(f"# {dag.dag_id}\n")
+
+ # Add schedule information
+ schedule_info = _get_schedule_info(dag)
+ if schedule_info:
+ markdown_output.append(f"**Schedule:** {schedule_info}\n\n")
+
+ # Add documentation content
+ if doc_md:
+ # Ensure proper spacing and formatting, dedent common whitespace
+ doc_content = textwrap.dedent(doc_md).strip()
+ markdown_output.append(f"{doc_content}\n\n")
+ else:
+ markdown_output.append("*No documentation available*\n\n")
+
+ # Add separator between DAGs (except for the last one)
Review Comment:
```suggestion
# Add separator between Dags (except for the last one)
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
+ if args.dag_id:
+ # Get specific DAG
+ dag = get_dag(args.subdir, args.dag_id)
+ dags = [dag]
+ else:
+ # Get all DAGs
+ dagbag = DagBag(process_subdir(args.subdir))
+ if dagbag.import_errors:
+ from rich import print as rich_print
+
+ rich_print(
+ "[red][bold]Error:[/bold] Failed to load all files. "
+ "For details, run `airflow dags list-import-errors`",
+ file=sys.stderr,
+ )
+ dags = sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id"))
+
+ # Generate markdown output
+ markdown_output = []
+
+ if len(dags) > 1:
+ markdown_output.append("# DAG Documentation\n")
+
+ for dag in dags:
+ doc_md = getattr(dag, '__doc_md__', None) or getattr(dag, 'doc_md',
None)
+
+ # Add DAG header
+ if len(dags) > 1:
+ markdown_output.append(f"## {dag.dag_id}\n")
+ else:
+ markdown_output.append(f"# {dag.dag_id}\n")
+
+ # Add schedule information
+ schedule_info = _get_schedule_info(dag)
+ if schedule_info:
+ markdown_output.append(f"**Schedule:** {schedule_info}\n\n")
+
+ # Add documentation content
+ if doc_md:
+ # Ensure proper spacing and formatting, dedent common whitespace
+ doc_content = textwrap.dedent(doc_md).strip()
+ markdown_output.append(f"{doc_content}\n\n")
+ else:
+ markdown_output.append("*No documentation available*\n\n")
+
+ # Add separator between DAGs (except for the last one)
+ if len(dags) > 1 and dag != dags[-1]:
+ markdown_output.append("---\n")
+
+ # Output the markdown
+ markdown_content = "".join(markdown_output)
+
+ if args.output_file:
+ # Write to file
+ with open(args.output_file, 'w', encoding='utf-8') as f:
+ f.write(markdown_content)
+ print(f"DAG documentation written to: {args.output_file}")
Review Comment:
```suggestion
print(f"Dag documentation written to: {args.output_file}")
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
Review Comment:
```suggestion
def _get_schedule_info(dag: DAG) -> str | None:
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
Review Comment:
```suggestion
if getattr(dag, 'timetable', None):
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
Review Comment:
```suggestion
# Preset schedules like @daily, @hourly
preset_map = {
'@once': 'Once',
'@hourly': 'Hourly (0 * * * *)',
'@daily': 'Daily (0 0 * * *)',
'@weekly': 'Weekly (0 0 * * 0)',
'@monthly': 'Monthly (0 0 1 * *)',
'@yearly': 'Yearly (0 0 1 1 *)',
'@annually': 'Annually (0 0 1 1 *)',
}
return preset_map.get(schedule, schedule, f"Cron: `{schedule}`")
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
Review Comment:
When will TypeError and AttributeError happen?
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
+ if args.dag_id:
+ # Get specific DAG
Review Comment:
```suggestion
# Get specific Dag
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
+ if args.dag_id:
+ # Get specific DAG
+ dag = get_dag(args.subdir, args.dag_id)
+ dags = [dag]
+ else:
+ # Get all DAGs
Review Comment:
```suggestion
# Get all Dags
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
+ if args.dag_id:
+ # Get specific DAG
+ dag = get_dag(args.subdir, args.dag_id)
+ dags = [dag]
+ else:
+ # Get all DAGs
+ dagbag = DagBag(process_subdir(args.subdir))
+ if dagbag.import_errors:
+ from rich import print as rich_print
+
+ rich_print(
+ "[red][bold]Error:[/bold] Failed to load all files. "
+ "For details, run `airflow dags list-import-errors`",
+ file=sys.stderr,
+ )
+ dags = sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id"))
+
+ # Generate markdown output
+ markdown_output = []
+
+ if len(dags) > 1:
+ markdown_output.append("# DAG Documentation\n")
+
+ for dag in dags:
+ doc_md = getattr(dag, '__doc_md__', None) or getattr(dag, 'doc_md',
None)
Review Comment:
```suggestion
doc_md = getattr(dag, '__doc_md__', getattr(dag, 'doc_md', None))
```
no strong opinion on this one.
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
Review Comment:
out of curious, why do we only want to show the first 3
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
+ if args.dag_id:
+ # Get specific DAG
+ dag = get_dag(args.subdir, args.dag_id)
+ dags = [dag]
+ else:
+ # Get all DAGs
+ dagbag = DagBag(process_subdir(args.subdir))
+ if dagbag.import_errors:
+ from rich import print as rich_print
+
+ rich_print(
+ "[red][bold]Error:[/bold] Failed to load all files. "
+ "For details, run `airflow dags list-import-errors`",
+ file=sys.stderr,
+ )
+ dags = sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id"))
+
+ # Generate markdown output
+ markdown_output = []
Review Comment:
```suggestion
markdown_output: list[str] = []
```
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -648,6 +649,149 @@ def dag_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> No
raise SystemExit("DagRun failed")
+def _get_schedule_info(dag) -> str | None:
+ """Extract and format schedule information from a DAG."""
+ try:
+ # Try to get schedule from different possible attributes
+ schedule = getattr(dag, 'schedule', None) or getattr(dag,
'schedule_interval', None)
+
+ if schedule is None:
+ return "Manual (no schedule)"
+
+ # Handle different schedule types
+ if isinstance(schedule, str):
+ # Cron expressions or preset schedules
+ if schedule.startswith('@'):
+ # Preset schedules like @daily, @hourly
+ preset_map = {
+ '@once': 'Once',
+ '@hourly': 'Hourly (0 * * * *)',
+ '@daily': 'Daily (0 0 * * *)',
+ '@weekly': 'Weekly (0 0 * * 0)',
+ '@monthly': 'Monthly (0 0 1 * *)',
+ '@yearly': 'Yearly (0 0 1 1 *)',
+ '@annually': 'Annually (0 0 1 1 *)',
+ }
+ return preset_map.get(schedule, schedule)
+ else:
+ # Cron expression
+ return f"Cron: `{schedule}`"
+
+ # Handle timedelta objects
+ if hasattr(schedule, 'total_seconds'):
+ seconds = schedule.total_seconds()
+ if seconds >= 86400: # 24 hours
+ days = int(seconds // 86400)
+ return f"Every {days} day{'s' if days > 1 else ''}"
+ elif seconds >= 3600: # 1 hour
+ hours = int(seconds // 3600)
+ return f"Every {hours} hour{'s' if hours > 1 else ''}"
+ elif seconds >= 60: # 1 minute
+ minutes = int(seconds // 60)
+ return f"Every {minutes} minute{'s' if minutes > 1 else ''}"
+ else:
+ return f"Every {int(seconds)} second{'s' if seconds > 1 else
''}"
+
+ # Handle timetable objects
+ if hasattr(dag, 'timetable') and dag.timetable:
+ timetable = dag.timetable
+ # Try to get a description from the timetable
+ if hasattr(timetable, 'description'):
+ return timetable.description
+ elif hasattr(timetable, 'summary'):
+ return timetable.summary
+ else:
+ return str(timetable.__class__.__name__)
+
+ # Handle dataset schedules
+ if hasattr(schedule, '__iter__') and not isinstance(schedule, str):
+ # Likely a dataset schedule (list/set of datasets)
+ try:
+ datasets = list(schedule)
+ if datasets:
+ dataset_names = [str(ds) for ds in datasets[:3]] # Show
first 3
+ if len(datasets) > 3:
+ return f"Dataset-triggered: {',
'.join(dataset_names)}, ..."
+ else:
+ return f"Dataset-triggered: {', '.join(dataset_names)}"
+ except (TypeError, AttributeError):
+ pass
+
+ # Fallback to string representation
+ return str(schedule)
+
+ except Exception:
+ # If anything fails, return None to skip schedule info
+ return None
+
+
+@cli_utils.action_cli
+@suppress_logs_and_warning
+@providers_configuration_loaded
+def dag_docs(args) -> None:
+ """Display DAG documentation (__doc_md__) at the command line."""
+ if args.dag_id:
+ # Get specific DAG
+ dag = get_dag(args.subdir, args.dag_id)
+ dags = [dag]
+ else:
+ # Get all DAGs
+ dagbag = DagBag(process_subdir(args.subdir))
+ if dagbag.import_errors:
+ from rich import print as rich_print
+
+ rich_print(
+ "[red][bold]Error:[/bold] Failed to load all files. "
+ "For details, run `airflow dags list-import-errors`",
+ file=sys.stderr,
+ )
+ dags = sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id"))
+
+ # Generate markdown output
+ markdown_output = []
+
+ if len(dags) > 1:
+ markdown_output.append("# DAG Documentation\n")
+
+ for dag in dags:
+ doc_md = getattr(dag, '__doc_md__', None) or getattr(dag, 'doc_md',
None)
+
+ # Add DAG header
Review Comment:
```suggestion
# Add Dag header
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]