mik-laj commented on a change in pull request #10677:
URL: https://github.com/apache/airflow/pull/10677#discussion_r482189488



##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -378,6 +379,47 @@ def dag_list_dag_runs(args, dag=None):
     print(table)
 
 
+@provide_session
+@cli_utils.action_logging
+def generate_pod_yaml(args, session=None):
+    """Generates yaml files for each task in the DAG. Used for testing output 
of KubernetesExecutor"""
+
+    from airflow.executors.kubernetes_executor import 
AirflowKubernetesScheduler, KubeConfig
+    from airflow.kubernetes import pod_generator
+    from airflow.kubernetes.pod_generator import PodGenerator
+    from airflow.kubernetes.worker_configuration import WorkerConfiguration
+    execution_date = datetime.datetime(2020,11,3)
+    dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
+    yaml_output_path = args.output_path or "/tmp/airflow_generated_yaml/"
+    kube_config = KubeConfig()
+    for task in dag.tasks:
+        ti = TaskInstance(task, execution_date)
+        pod = PodGenerator.construct_pod(
+            dag_id=args.dag_id,
+            task_id=ti.task_id,
+            pod_id=AirflowKubernetesScheduler._create_pod_id(args.dag_id, 
ti.task_id),
+            try_number=ti.try_number,
+            date=ti.execution_date,
+            command=ti.command_as_list(),
+            kube_executor_config=PodGenerator.from_obj(ti.executor_config),
+            worker_uuid="worker-config",
+            namespace=kube_config.executor_namespace,
+            worker_config=WorkerConfiguration(kube_config=kube_config).as_pod()
+        )
+        import os
+
+        import yaml
+        from kubernetes.client.api_client import ApiClient
+        api_client = ApiClient()
+        date_string = 
pod_generator.datetime_to_label_safe_datestring(execution_date)
+        yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml"
+        os.makedirs(os.path.dirname(yaml_output_path), exist_ok=True)
+        with open(yaml_output_path + yaml_file_name, "w") as output:
+            sanitized_pod = api_client.sanitize_for_serialization(pod)
+            output.write(yaml.dump(sanitized_pod))
+    print(f"YAML output can be found at {yaml_output_path}")

Review comment:
       ```suggestion
       print(f"YAML output can be found at {yaml_output_path}", file=sys.err)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to