kaxil commented on code in PR #44972:
URL: https://github.com/apache/airflow/pull/44972#discussion_r1888196847
##########
airflow/dag_processing/processor.py:
##########
@@ -16,610 +16,229 @@
# under the License.
from __future__ import annotations
-import importlib
-import logging
import os
-import signal
-import threading
-import time
-import zipfile
-from collections.abc import Generator, Iterable
-from contextlib import contextmanager, redirect_stderr, redirect_stdout,
suppress
-from dataclasses import dataclass
-from typing import TYPE_CHECKING
-
-from setproctitle import setproctitle
-from sqlalchemy import event
-
-from airflow import settings
+import sys
+import traceback
+from collections.abc import Generator
+from typing import TYPE_CHECKING, Annotated, Callable, Literal, Union
+
+import attrs
+import pydantic
+
from airflow.callbacks.callback_requests import (
+ CallbackRequest,
DagCallbackRequest,
TaskCallbackRequest,
)
from airflow.configuration import conf
-from airflow.exceptions import AirflowException
-from airflow.models.dag import DAG
from airflow.models.dagbag import DagBag
-from airflow.models.pool import Pool
-from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstance, _run_finished_callback
+from airflow.sdk.execution_time.comms import GetConnection, GetVariable
+from airflow.sdk.execution_time.supervisor import WatchedSubprocess
+from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
from airflow.stats import Stats
-from airflow.utils import timezone
-from airflow.utils.file import iter_airflow_imports, might_contain_dag
-from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter,
set_context
-from airflow.utils.mixins import MultiprocessingStartMethodMixin
-from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
- import multiprocessing
- from datetime import datetime
- from multiprocessing.connection import Connection as
MultiprocessingConnection
-
- from sqlalchemy.orm.session import Session
-
- from airflow.callbacks.callback_requests import CallbackRequest
- from airflow.models.operator import Operator
+ from airflow.typing_compat import Self
+ from airflow.utils.context import Context
+
+
+def _parse_file_entrypoint():
+ import os
+
+ import structlog
+
+ from airflow.sdk.execution_time import task_runner
+ # Parse DAG file, send JSON back up!
+
+ comms_decoder = task_runner.CommsDecoder[DagFileParseRequest,
DagFileParsingResult](
+ input=sys.stdin,
+ decoder=pydantic.TypeAdapter[DagFileParseRequest](DagFileParseRequest),
+ )
+ msg = comms_decoder.get_message()
+ comms_decoder.request_socket = os.fdopen(msg.requests_fd, "wb",
buffering=0)
+
+ log = structlog.get_logger(logger_name="task")
+
+ result = _parse_file(msg, log)
+ comms_decoder.send_request(log, result)
+
+
+def _parse_file(msg: DagFileParseRequest, log):
+ # TODO: Set known_pool names on DagBag!
Review Comment:
I would love it if we could find a better place for checks like that. Fine
for now, optimization for later.
i.e. `DagBag.run_known_checks()` or something similar where we could do
known_pool and other such things
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]